Source code for qvartools._ext.sbd_subprocess

"""Subprocess-based wrapper for r-ccs-cms/sbd GPU-native diagonalization.

Phase 1 integration: calls sbd as an external process via FCIDUMP files.
This will be replaced by nanobind bindings in Phase 2 (ADR-003).

Usage::

    from qvartools._ext.sbd_subprocess import sbd_diagonalize
    energy = sbd_diagonalize(
        h1e, h2e, n_orb, n_elec, nuclear_repulsion,
        alpha_strings, beta_strings,
    )
"""

from __future__ import annotations

import logging
import math
import os
import shutil
import subprocess
import tempfile

import numpy as np
import torch

logger = logging.getLogger(__name__)


def _find_sbd_binary() -> str | None:
    """Find the sbd diag binary."""
    env_binary = os.environ.get("QVARTOOLS_SBD_BINARY", "")
    if env_binary and os.path.isfile(env_binary) and os.access(env_binary, os.X_OK):
        return env_binary
    for path in [
        os.path.expanduser("~/.local/bin/sbd-diag"),
        "/usr/local/bin/sbd-diag",
    ]:
        if os.path.isfile(path) and os.access(path, os.X_OK):
            return path
    return None


def _find_mpirun() -> str | None:
    """Find mpirun executable."""
    return shutil.which("mpirun")


[docs] def sbd_available() -> bool: """Check if sbd binary and mpirun are both available.""" return _find_sbd_binary() is not None and _find_mpirun() is not None
def _write_fcidump( path: str, h1e: np.ndarray, h2e: np.ndarray, n_orb: int, n_elec: int, nuclear_repulsion: float, ms2: int = 0, ) -> None: """Write molecular integrals in FCIDUMP format. Uses ``pyscf.tools.fcidump`` when available (fastest, handles all symmetry reduction internally). Falls back to a vectorised numpy implementation that avoids the O(n⁴) Python loop. """ try: from pyscf.tools import fcidump as pyscf_fcidump pyscf_fcidump.from_integrals( path, h1e, h2e, n_orb, n_elec, nuc=nuclear_repulsion, ms=ms2 ) return except ImportError: pass # Vectorised fallback: generate all symmetry-reduced index pairs at once with open(path, "w") as f: f.write(f" &FCI NORB={n_orb:3d},NELEC={n_elec},MS2={ms2},\n") f.write(" ORBSYM=" + ",".join(["1"] * n_orb) + ",\n") f.write(" ISYM=1,\n") f.write(" &END\n") # Two-electron integrals: (p,q) with q<=p, (r,s) with s<=r, pq>=rs pq_p, pq_q = np.tril_indices(n_orb) pq_idx = pq_p * (pq_p + 1) // 2 + pq_q n_pairs = len(pq_p) pq_all = np.repeat(np.arange(n_pairs), n_pairs) rs_all = np.tile(np.arange(n_pairs), n_pairs) mask = pq_idx[pq_all] >= pq_idx[rs_all] pq_sel, rs_sel = pq_all[mask], rs_all[mask] p, q = pq_p[pq_sel], pq_q[pq_sel] r, s = pq_p[rs_sel], pq_q[rs_sel] vals = h2e[p, q, r, s] nz = np.abs(vals) > 1e-12 for v, pi, qi, ri, si in zip( vals[nz], p[nz] + 1, q[nz] + 1, r[nz] + 1, s[nz] + 1 ): f.write(f"{v:23.16e} {pi:4d} {qi:4d} {ri:4d} {si:4d}\n") # One-electron integrals: q<=p h1_p, h1_q = np.tril_indices(n_orb) h1_vals = h1e[h1_p, h1_q] h1_nz = np.abs(h1_vals) > 1e-12 for v, pi, qi in zip(h1_vals[h1_nz], h1_p[h1_nz] + 1, h1_q[h1_nz] + 1): f.write(f"{v:23.16e} {pi:4d} {qi:4d} {0:4d} {0:4d}\n") f.write(f"{nuclear_repulsion:23.16e} {0:4d} {0:4d} {0:4d} {0:4d}\n") def _write_bitstrings(path: str, strings: torch.Tensor) -> None: """Write occupation strings as bitstring file (right-to-left ordering).""" arr = strings.detach().cpu().numpy() with open(path, "w") as f: for row in arr: bits = "".join(str(int(b)) for b in row[::-1]) f.write(bits + "\n")
[docs] def sbd_diagonalize( h1e: np.ndarray, h2e: np.ndarray, n_orb: int, n_elec: int, nuclear_repulsion: float, alpha_strings: torch.Tensor, beta_strings: torch.Tensor, *, ms2: int = 0, tolerance: float = 1e-8, max_iterations: int = 50, n_threads: int = 4, timeout: int = 600, ) -> float: """Run sbd diagonalization via subprocess. Parameters ---------- h1e : np.ndarray One-electron integrals, shape ``(n_orb, n_orb)``. h2e : np.ndarray Two-electron integrals, shape ``(n_orb, n_orb, n_orb, n_orb)``. n_orb : int Number of spatial orbitals. n_elec : int Total number of electrons. nuclear_repulsion : float Nuclear repulsion energy. alpha_strings : torch.Tensor Unique alpha occupation strings, shape ``(n_alpha, n_orb)``. beta_strings : torch.Tensor Unique beta occupation strings, shape ``(n_beta, n_orb)``. ms2 : int, optional Twice the spin projection (default ``0`` for singlet). tolerance : float, optional Davidson convergence tolerance. max_iterations : int, optional Maximum Davidson iterations. n_threads : int, optional Number of OpenMP threads per MPI rank (default ``4``). Passed via ``mpirun -x OMP_NUM_THREADS``. The subprocess runs with a single MPI rank (``-np 1``). timeout : int, optional Subprocess timeout in seconds (default ``600``). Returns ------- float Ground state energy (electronic + nuclear repulsion). Raises ------ ValueError If integral shapes are inconsistent with ``n_orb``, or if ``alpha_strings``/``beta_strings`` have wrong rank or column count. RuntimeError If the sbd binary or mpirun is not found, the subprocess fails, or the energy cannot be parsed from the output. """ # --- Input validation --- h1e = np.asarray(h1e) h2e = np.asarray(h2e) if h1e.ndim != 2 or h1e.shape != (n_orb, n_orb): raise ValueError(f"h1e must have shape ({n_orb}, {n_orb}), got {h1e.shape}") if h2e.ndim != 4 or h2e.shape != (n_orb, n_orb, n_orb, n_orb): raise ValueError( f"h2e must have shape ({n_orb}, {n_orb}, {n_orb}, {n_orb}), got {h2e.shape}" ) for name, arr in [("alpha_strings", alpha_strings), ("beta_strings", beta_strings)]: if arr.ndim != 2 or arr.shape[1] != n_orb: raise ValueError( f"{name} must have shape (n_strings, {n_orb}), got {tuple(arr.shape)}" ) if arr.is_floating_point(): raise ValueError(f"{name} must be integer dtype, got {arr.dtype}") binary = _find_sbd_binary() if binary is None: raise RuntimeError( "sbd binary not found. Set QVARTOOLS_SBD_BINARY env var " "or compile from https://github.com/r-ccs-cms/sbd" ) mpirun = _find_mpirun() if mpirun is None: raise RuntimeError("mpirun not found in PATH. Install OpenMPI.") with tempfile.TemporaryDirectory(prefix="qvartools_sbd_") as tmpdir: fcidump_path = os.path.join(tmpdir, "fcidump.txt") alpha_path = os.path.join(tmpdir, "alpha.txt") beta_path = os.path.join(tmpdir, "beta.txt") _write_fcidump( fcidump_path, h1e, h2e, n_orb, n_elec, nuclear_repulsion, ms2=ms2 ) _write_bitstrings(alpha_path, alpha_strings) _write_bitstrings(beta_path, beta_strings) cmd = [mpirun] if hasattr(os, "getuid") and os.getuid() == 0: cmd.append("--allow-run-as-root") cmd += [ "-np", "1", "-x", f"OMP_NUM_THREADS={n_threads}", binary, "--fcidump", fcidump_path, "--adetfile", alpha_path, "--bdetfile", beta_path, "--method", "0", "--block", "10", "--iteration", str(max_iterations), "--tolerance", str(tolerance), "--init", "0", "--rdm", "0", ] logger.debug("sbd command: %s", " ".join(cmd)) result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) if result.returncode != 0: raise RuntimeError( f"sbd failed (exit {result.returncode})\n" f"Command: {' '.join(cmd)}\n" f"Stdout: {(result.stdout or '')[:500]}\n" f"Stderr: {(result.stderr or '')[:500]}" ) # Parse the LAST "Energy =" line (final result, not intermediate) # Take only the first token after "Energy =" to tolerate units/extra text energy = None for line in result.stdout.splitlines(): if "Energy =" in line: field = line.split("Energy =", 1)[1].strip() if not field: continue try: energy = float(field.split()[0]) except ValueError: continue if energy is not None: if not math.isfinite(energy): raise RuntimeError(f"sbd returned non-finite energy: {energy}") return energy raise RuntimeError( f"Could not parse energy from sbd output:\n{result.stdout[:1000]}" )