Skip to content

vasp_manager

ASCII_LOGO = format(version('vasp_manager'))

logger module-attribute

logger = getLogger(__name__)

VaspManager

VaspManager(
    calculation_types: list[CalculationType],
    material_dirs: Filepaths | WorkingDirectory,
    to_rerun: bool = True,
    to_submit: bool = True,
    ignore_personal_errors: bool = True,
    tail: int = 5,
    use_multiprocessing: bool = False,
    ncore: None | int = None,
    calculation_manager_kwargs: dict | None = None,
    max_reruns: int = 3,
    magmom_per_atom_cutoff: float = 0.0,
    sort_by: Callable[[str], str] = str,
)

Handles set up and execution of each CalculationManager (rlx-coarse, rlx, static, bulkmod, elastic)

Parameters:

  • calculation_types (list[CalculationType]) –

    list of calculation types

  • material_dirs (Filepaths | WorkingDirectory) –

    list of material directory paths or name of calculations directory

  • to_rerun (bool, default: True ) –

    if True, rerun failed calculations

  • to_submit (bool, default: True ) –

    if True, submit calculations

  • ignore_personal_errors (bool, default: True ) –

    if True, ignore job submission errors if on personal computer

  • tail (int, default: 5 ) –

    number of last lines from stdout.txt to log in debugging if job failed

  • use_multiprocessing (bool, default: False ) –

    if True, use pool.map()

  • ncore (None | int, default: None ) –

    if ncore, use {ncore} processes for multiprocessing if None, defaults to minimum(number of materials, 4)

  • calculation_manager_kwargs (dict | None, default: None ) –

    contains subdictionaries for each calculation type. Each subdictorary can be filled with extra kwargs to pass to its associated CalculationManager during instantiation

  • max_reruns (int, default: 3 ) –

    the maximum number of times a rlx-coarse or rlx calculation can run before refusing to continue Note: other modes don't make archives, so they are not affected by this

  • magmom_per_atom_cutoff (float, default: 0.0 ) –

    calculations that result in magmom_per_atom less than this parameter will be automatically rerun without spin-polarization

  • sort_by (Callable[[str], str], default: str ) –

    function to sort the keys of the result dictionary

Source code in vasp_manager/vasp_manager.py
def __init__(
    self,
    calculation_types: list[CalculationType],
    material_dirs: Filepaths | WorkingDirectory,
    to_rerun: bool = True,
    to_submit: bool = True,
    ignore_personal_errors: bool = True,
    tail: int = 5,
    use_multiprocessing: bool = False,
    ncore: None | int = None,
    calculation_manager_kwargs: dict | None = None,
    max_reruns: int = 3,
    magmom_per_atom_cutoff: float = 0.0,
    sort_by: Callable[[str], str] = str,
) -> None:
    """
    Args:
        calculation_types: list of calculation types
        material_dirs: list of material directory paths or name of
            calculations directory
        to_rerun: if True, rerun failed calculations
        to_submit: if True, submit calculations
        ignore_personal_errors: if True, ignore job submission errors if on
            personal computer
        tail: number of last lines from stdout.txt to log in debugging
            if job failed
        use_multiprocessing: if True, use pool.map()
        ncore: if ncore, use {ncore} processes for multiprocessing
            if None, defaults to minimum(number of materials, 4)
        calculation_manager_kwargs: contains subdictionaries for each
            calculation type. Each subdictorary can be filled with extra kwargs
            to pass to its associated CalculationManager during instantiation
        max_reruns: the maximum number of times a rlx-coarse or rlx
            calculation can run before refusing to continue
            Note: other modes don't make archives, so they are not affected
            by this
        magmom_per_atom_cutoff: calculations that result in
            magmom_per_atom less than this parameter will be automatically
            rerun without spin-polarization
        sort_by: function to sort the keys of the result dictionary
    """
    print(ASCII_LOGO)
    self.sort_by = sort_by
    self.calculation_types = calculation_types
    self.material_dirs = material_dirs
    self.to_rerun = to_rerun
    self.to_submit = to_submit
    self.ignore_personal_errors = ignore_personal_errors
    self.tail = tail
    self.use_multiprocessing = use_multiprocessing
    self.ncore = ncore
    self.calculation_manager_kwargs = (
        calculation_manager_kwargs if calculation_manager_kwargs else {}
    )
    self.max_reruns = max_reruns
    self.magmom_per_atom_cutoff = magmom_per_atom_cutoff

    self.calculation_managers = self._get_all_calculation_managers()
    # self.base_dir is set in material_dirs.setter
    self.results_path = self.base_dir / "results.json"
    self.results = None

calculation_manager_kwargs property writable

calculation_manager_kwargs: dict

calculation_managers instance-attribute

calculation_managers = _get_all_calculation_managers()

calculation_types property writable

calculation_types: list[CalculationType]

ignore_personal_errors instance-attribute

ignore_personal_errors = ignore_personal_errors

magmom_per_atom_cutoff instance-attribute

magmom_per_atom_cutoff = magmom_per_atom_cutoff

material_dirs property writable

material_dirs: Filepaths

material_names cached property

material_names: list[str]

max_reruns instance-attribute

max_reruns = max_reruns

ncore property writable

ncore: int

results property writable

results: dict

results_path instance-attribute

results_path = base_dir / 'results.json'

sort_by instance-attribute

sort_by = sort_by

tail instance-attribute

tail = tail

to_rerun instance-attribute

to_rerun = to_rerun

to_submit instance-attribute

to_submit = to_submit

use_multiprocessing instance-attribute

use_multiprocessing = use_multiprocessing

run_calculations

run_calculations() -> dict

Runs vasp job workflow for all materials

Source code in vasp_manager/vasp_manager.py
def run_calculations(self) -> dict:
    """
    Runs vasp job workflow for all materials
    """
    results = self._manage_calculations_wrapper()
    for material_name, material_result in results:
        self.results[material_name].update(material_result)

    json_str = json.dumps(self.results, indent=2, cls=NumpyEncoder)
    logger.debug(json_str)
    with open(self.results_path, "w+") as fw:
        fw.write(json_str)
    print(f"Dumped to {self.results_path}")
    return self.results

summary

summary(
    as_string: bool = True,
    print_unfinished: bool = False,
    print_stopped: bool = True,
) -> str | dict

Create a string summary of all calculations

Parameters:

  • as_string (bool, default: True ) –

    if True, return string summary. Else, return dict summary

  • print_unfinished (bool, default: False ) –

    if True, include a list of unfinished materials for each calculation type in the summary

  • print_stopped (bool, default: True ) –

    if True, include a list of stopped materials for each calculation type in the summary

Source code in vasp_manager/vasp_manager.py
def summary(
    self,
    as_string: bool = True,
    print_unfinished: bool = False,
    print_stopped: bool = True,
) -> str | dict:
    """
    Create a string summary of all calculations

    Args:
        as_string: if True, return string summary. Else, return dict summary
        print_unfinished: if True, include a list of unfinished
            materials for each calculation type in the summary
        print_stopped: if True, include a list of stopped materials for
            each calculation type in the summary
    """
    if not self.results_path.exists():
        raise ValueError(f"Can't find results in {self.results_path}")
    with open(self.results_path) as fr:
        results = json.load(fr)

    summary_dict: dict[str, Any] = {}
    summary_dict["n_total"] = len(results)
    for calc_type in self.calculation_types:
        summary_dict[calc_type] = {}
        summary_dict[calc_type]["n_finished"] = 0
        summary_dict[calc_type]["finished"] = []
        summary_dict[calc_type]["unfinished"] = []
        summary_dict[calc_type]["stopped"] = []

        for material, mat_results in results.items():
            # need to account for case key doesn't yet exist
            if calc_type not in mat_results:
                summary_dict[calc_type]["unfinished"].append(material)
            else:
                is_done, is_stopped = self._check_calc_by_result(material, calc_type)
                if is_done:
                    summary_dict[calc_type]["n_finished"] += 1
                    summary_dict[calc_type]["finished"].append(material)
                else:
                    if is_stopped:
                        summary_dict[calc_type]["stopped"].append(material)
                    summary_dict[calc_type]["unfinished"].append(material)

    if as_string:
        n_materials = summary_dict["n_total"]
        summary_str = ""
        summary_str += f"Total Materials = {n_materials}\n"
        summary_str += "-" * 30 + "\n"
        for calc_type in self.calculation_types:
            name = calc_type.upper()
            n_finished = summary_dict[calc_type]["n_finished"]
            summary_str += f"{name: <12}{n_finished}/{n_materials} completed\n"
            if print_unfinished:
                unfinished = summary_dict[calc_type]["unfinished"]
                if len(unfinished) != 0:
                    summary_str += (
                        " " * 12 + f"{n_materials - n_finished} not completed\n"
                    )
                    summary_str += f"Unfinished {calc_type.upper()}: {unfinished}\n"
            if print_stopped:
                stopped = summary_dict[calc_type]["stopped"]
                if len(stopped) != 0:
                    summary_str += f"\tStopped {calc_type.upper()}: {stopped}\n"
        return summary_str
    else:
        return summary_dict