"""Molecular orbitals calculation using CP2K and `QMFlows <https://github.com/SCM-NV/qmflows>`.
API
---
.. autofunction:: calculate_mos
"""
from __future__ import annotations
import fnmatch
import os
from os.path import join
from typing import NamedTuple, Tuple, Union
import numpy as np
from noodles import gather, schedule
from qmflows.common import CP2KInfoMO
from qmflows.type_hints import PathLike, PromisedObject
from qmflows.warnings_qmflows import SCF_Convergence_Warning
from .. import logger
from .. import _data
from ..common import Matrix, is_data_in_hdf5, read_cell_parameters_as_array, store_arrays_in_hdf5
from .scheduleCP2K import prepare_job_cp2k
__all__ = ["calculate_mos", "create_point_folder", "split_file_geometries"]
#: Molecular orbitals from both restricted and unrestricted calculations
OrbitalType = Union[CP2KInfoMO, Tuple[CP2KInfoMO, CP2KInfoMO]]
class JobFiles(NamedTuple):
"""Contains the data to compute the molecular orbitals for a given geometry."""
get_xyz: PathLike
get_inp: PathLike
get_out: PathLike
get_MO: PathLike
[docs]
def calculate_mos(config: _data.GeneralOptions) -> PromisedObject:
"""Look for the MO in the HDF5 file and compute them if they are not present.
The orbitals are computed by splitting the jobs in batches given by
the ``restart_chunk`` variables. Only the first job is calculated from scratch
while the rest of the batch uses as guess the wave function of the first calculation
inthe batch.
The config dict contains:
* geometries: list of molecular geometries
* project_name: Name of the project used as root path for storing data in HDF5.
* path_hdf5: Path to the HDF5 file that contains the numerical results.
* folders: path to the directories containing the MO outputs
* settings_main: Settings for the job to run.
* calc_new_wf_guess_on_points: Calculate a new Wave function guess in
each of the geometries indicated. By Default only an initial guess is computed.
* enumerate_from: Number from where to start enumerating the folders create for
each point in the MD
Returns
-------
paths to the datasets in the HDF5 file containging both the MO energies and MO coefficients
"""
# Read Cell parameters file
general = config.cp2k_general_settings
file_cell_parameters = general.file_cell_parameters
if file_cell_parameters is not None:
array_cell_parameters = read_cell_parameters_as_array(file_cell_parameters)[
1]
# First calculation has no initial guess
# calculate the rest of the jobs using the previous point as initial guess
# list to the nodes in the HDF5 containing the MOs
orbitals: list[None | tuple[str, str, str]] = []
energies = []
guess_job = None
# orbital type is either an empty string for restricted calculation
# or alpha/beta for unrestricted calculations
orbitals_type = config.orbitals_type
for j, gs in enumerate(config.geometries):
# number of the point with respect to all the trajectory
k = j + config.enumerate_from
# dictionary containing the information of the j-th job
dict_input = _data.ComponentsData(
geometry=gs,
k=k,
node_MOs=( # Path where the MOs will be store in the HDF5
join(orbitals_type, "eigenvalues", f"point_{k}"),
join(orbitals_type, "coefficients", f"point_{k}"),
join(orbitals_type, "occupation", f"point_{k}"),
),
node_energy=join(orbitals_type, "energy", f"point_{k}"),
)
# If the MOs are already store in the HDF5 format return the path
# to them and skip the calculation
if config.compute_orbitals:
predicate: str | tuple[str, str, str] = dict_input.node_MOs
else:
predicate = dict_input.node_energy
if is_data_in_hdf5(config.path_hdf5, predicate):
logger.info(f"point_{k} has already been calculated")
orbitals.append(dict_input.node_MOs)
else:
logger.info(f"point_{k} has been scheduled")
# Add cell parameters from file if given
if file_cell_parameters is not None:
adjust_cell_parameters(general, array_cell_parameters, j)
# Path to I/O files
dict_input.point_dir = config.folders[j]
dict_input.job_files = create_file_names(dict_input.point_dir, k)
dict_input.job_name = f'point_{k}'
# Compute the MOs and return a new guess
promise_qm = compute_orbitals(config, dict_input, guess_job)
# Check if the job finishes succesfully
promise_qm = schedule_check(promise_qm, config, dict_input)
# Store the computation
if config.compute_orbitals:
orbitals.append(store_molecular_orbitals(config, dict_input, promise_qm))
else:
orbitals.append(None)
energies.append(store_enery(config, dict_input, promise_qm))
guess_job = promise_qm
return gather(gather(*orbitals), gather(*energies))
@schedule
def store_molecular_orbitals(
config: _data.GeneralOptions,
dict_input: _data.ComponentsData,
promise_qm: PromisedObject,
) -> tuple[str, str, str]:
"""Store the MOs in the HDF5.
Returns
-------
str
Node path in the HDF5
"""
# Molecular Orbitals
mos = promise_qm.orbitals
# Store in the HDF5
try:
save_orbitals_in_hdf5(mos, config, dict_input.job_name)
# Remove the ascii MO file
finally:
if config.remove_log_file:
work_dir = promise_qm.archive['work_dir']
path_mos = fnmatch.filter(os.listdir(work_dir), 'mo_*MOLog')[0]
os.remove(join(work_dir, path_mos))
return dict_input.node_MOs
def save_orbitals_in_hdf5(mos: OrbitalType, config: _data.GeneralOptions, job_name: str) -> None:
"""Store the orbitals from restricted and unrestricted calculations."""
if isinstance(mos, CP2KInfoMO):
dump_orbitals_to_hdf5(mos, config, job_name)
else:
alphas, betas = mos
dump_orbitals_to_hdf5(alphas, config, job_name, "alphas")
dump_orbitals_to_hdf5(betas, config, job_name, "betas")
def dump_orbitals_to_hdf5(
data: CP2KInfoMO,
config: _data.GeneralOptions,
job_name: str,
orbitals_type: str = "",
) -> None:
"""Store the result in HDF5 format.
Parameters
----------
data
Tuple of energies and coefficients of the molecular orbitals
config
Dictionary with the job configuration
job_name
Name of the current job
orbitals_type
Either an empty string for MO coming from a restricted job or alpha/beta
for unrestricted MO calculation
"""
for name, array in zip(("eigenvalues", "coefficients"), (data.eigenvalues, data.eigenvectors)):
path_property = join(orbitals_type, name, job_name)
store_arrays_in_hdf5(config.path_hdf5, path_property, array)
# Store the number of occupied and virtual orbitals as a size-2 dataset.
# Occupied in this context is equivalent to "non-zero occupation"
path_property = join(orbitals_type, "occupation", job_name)
occ_array = np.array(data.get_nocc_nvirt(), dtype=np.int64)
store_arrays_in_hdf5(config.path_hdf5, path_property, occ_array, dtype=np.int64)
@schedule
def store_enery(
config: _data.GeneralOptions,
dict_input: _data.ComponentsData,
promise_qm: PromisedObject,
) -> str:
"""Store the total energy in the HDF5 file.
Returns
-------
str
Node path to the energy in the HDF5
"""
store_arrays_in_hdf5(config.path_hdf5, dict_input.node_energy, promise_qm.energy)
logger.info(f"Total energy of point {dict_input.k} is: {promise_qm.energy}")
return dict_input.node_energy
def compute_orbitals(
config: _data.GeneralOptions,
dict_input: _data.ComponentsData,
guess_job: None | PromisedObject,
) -> PromisedObject:
"""Call a Quantum chemisty package to compute the MOs.
When finish store the MOs in the HdF5 and returns a new guess.
"""
dict_input.job_files = create_file_names(dict_input.point_dir, dict_input.k)
# Calculating initial guess
compute_guess = config.calc_new_wf_guess_on_points is not None
# A job is a restart if guess_job is None and the list of
# wf guesses are not empty
is_restart = guess_job is None and compute_guess
pred = (dict_input.k in config.calc_new_wf_guess_on_points) or is_restart
general = config.cp2k_general_settings
if pred:
guess_job = prepare_job_cp2k(general.cp2k_settings_guess, dict_input, guess_job)
return prepare_job_cp2k(general.cp2k_settings_main, dict_input, guess_job)
@schedule
def schedule_check(
promise_qm: PromisedObject,
config: _data.GeneralOptions,
dict_input: _data.ComponentsData,
) -> PromisedObject:
"""Check wether a calculation finishes succesfully otherwise run a new guess."""
job_name = dict_input.job_name
point_dir = dict_input.point_dir
# Warnings of the computation
warnings = promise_qm.warnings
# Check for SCF convergence errors
if not config.ignore_warnings and warnings is not None and any(
w == SCF_Convergence_Warning for msg, w in warnings.items()):
# Report the failure
msg = f"Job: {job_name} Finished with Warnings: {warnings}"
logger.warning(msg)
# recompute a new guess
msg1 = f"Computing a new wave function guess for job: {job_name}"
logger.warning(msg1)
# Remove the previous ascii file containing the MOs
msg2 = f"removing file containig the previous failed MOs of {job_name}"
logger.warning(msg2)
path = fnmatch.filter(os.listdir(point_dir), 'mo*MOLog')[0]
os.remove(join(point_dir, path))
# Compute new guess at point k
config.calc_new_wf_guess_on_points.append(dict_input.k)
return compute_orbitals(config, dict_input, None)
else:
return promise_qm
def create_point_folder(
work_dir: str | os.PathLike[str], n: int, enumerate_from: int) -> list[str]:
"""Create a new folder for each point in the MD trajectory."""
folders = []
for k in range(enumerate_from, n + enumerate_from):
new_dir = join(work_dir, f'point_{k}')
os.makedirs(new_dir, exist_ok=True)
folders.append(new_dir)
return folders
def split_file_geometries(path_xyz: PathLike) -> list[str]:
"""Read a set of molecular geometries in xyz format."""
# Read Cartesian Coordinates
with open(path_xyz, "r", encoding="utf8") as f:
xss = iter(f.readlines())
data = []
while True:
try:
natoms = int(next(xss).split()[0])
molecule = "".join([next(xss) for _ in range(natoms + 1)])
data.append(f"{natoms}\n{molecule}")
except StopIteration:
break
return data
def create_file_names(work_dir: str | os.PathLike[str], i: int) -> JobFiles:
"""Create a namedTuple with the name of the 4 files used for each point in the trajectory."""
file_xyz = join(work_dir, f'coordinates_{i}.xyz')
file_inp = join(work_dir, f'point_{i}.inp')
file_out = join(work_dir, f'point_{i}.out')
file_mo = join(work_dir, f'mo_coeff_{i}.out')
return JobFiles(file_xyz, file_inp, file_out, file_mo)
def adjust_cell_parameters(
general: _data.CP2KGeneralSetting,
array_cell_parameters: Matrix,
j: int,
) -> None:
"""Adjust the cell parameters on the fly.
If the cell parameters change during the MD simulations, adjust them
for the molecular orbitals computation.
"""
for s in (general.cp2k_settings_main, general.cp2k_settings_guess):
s.cell_parameters = array_cell_parameters[j, 2:11].reshape(3, 3).tolist()
s.cell_angles = None