"""
Provides functions for submitting models as jobs
.. codeauthor:: David Zwicker <david.zwicker@ds.mpg.de>
"""
from __future__ import annotations
import errno
import itertools
import json
import logging
import os
import shlex
import subprocess as sp
import sys
import warnings
from pathlib import Path
from typing import Any, Iterable, Literal
from tqdm.auto import tqdm
from .. import Parameter
from ..config import Config
[docs]def escape_string(obj) -> str:
"""escape a string for the command line"""
return shlex.quote(str(obj))
[docs]def ensure_directory_exists(folder):
"""creates a folder if it not already exists"""
if folder == "":
return
try:
os.makedirs(folder)
except OSError as err:
if err.errno != errno.EEXIST:
raise
DEFAULT_CONFIG = [
Parameter(
"python_bin",
"sys.executable",
str,
"Path to the python executable to be used. The special value `sys.executable` "
"uses the value returned by `sys.executable`.",
),
Parameter("num_threads", 1, int, "The number of threads to be used"),
Parameter(
"method",
"qsub",
str,
"The job submission method.",
choices=["background", "foreground", "qsub", "srun"],
),
Parameter(
"partition", "", str, "The partition to which the job will be submitted."
),
]
[docs]def get_config(
config: str | dict[str, Any] | None = None, *, load_user_config: bool = True
) -> Config:
"""create the job configuration
Args:
config (str or dict):
Configuration settings that will be used to update the default config
load_user_config (bool):
Determines whether the file `~/.modelrunner` is loaded as a YAML document to
provide user-defined settings.
Returns:
:class:`~modelrunner.config.Config`: the established configuration
"""
c = Config(DEFAULT_CONFIG)
if load_user_config:
path = Path.home() / ".modelrunner"
if path.is_file():
c.load(path)
if isinstance(config, str):
c.update(json.loads(config))
elif config is not None:
c.update(config)
return c
[docs]def get_job_name(base: str, args: dict[str, Any] | None = None, length: int = 7) -> str:
"""create a suitable job name
Args:
base (str):
The stem of the job name
args (dict):
Parameters to include in the job name
length (int):
Length of the abbreviated parameter name
Returns:
str: A suitable job name
"""
if args is None:
args = {}
res = base[:-1] if base.endswith("_") else base
for name, value in args.items():
if hasattr(value, "__iter__"):
value_str = "_".join(f"{v:g}" for v in value)
else:
value_str = f"{value:g}"
res += f"_{name.replace('_', '')[:length].upper()}_{value_str}"
return res
OverwriteStrategyType = Literal[
"error", "warn_skip", "silent_skip", "overwrite", "silent_overwrite"
]
[docs]def submit_job(
script: str | Path,
output: str | Path | None = None,
name: str = "job",
parameters: str | dict[str, Any] | None = None,
config: str | dict[str, Any] | None = None,
*,
log_folder: str | Path | None = None,
method: str = "auto",
use_modelrunner: bool = True,
template: str | Path | None = None,
overwrite_strategy: OverwriteStrategyType = "error",
**kwargs,
) -> tuple[str, str]:
"""submit a script to the cluster queue
Args:
script (str of :class:`~pathlib.Path`):
Path to the script file, which contains the model
output (str of :class:`~pathlib.Path`):
Path to the output file, where all the results are saved
name (str):
Name of the job
parameters (str or dict):
Parameters for the script, either as a python dictionary or a string
containing a JSON-encoded dictionary.
config (str or dict):
Configuration for the job, which determines how the job is run. Can be either
a python dictionary or a string containing a JSON-encoded dictionary.
log_folder (str of :class:`~pathlib.Path`):
Path to the logging folder. If omitted, the default of the template is used,
which typically sends data to stdout for local scripts (which is thus
captured and returned by this function) or writes log files to the current
working directory for remote jobs.
method (str):
Specifies the submission method. Currently `background`, `foreground`,
'srun', and `qsub` are supported. The special value `auto` reads the method
from the `config` argument.
use_modelrunner (bool):
If True, `script` is envoked with the modelrunner library, e.g. by calling
`python -m modelrunner {script}`.
template (str of :class:`~pathlib.Path`):
Jinja template file for submission script. If omitted, a standard template
is chosen based on the submission method.
overwrite_strategy (str):
Determines what to do when files already exist. Possible options include
`error`, `warn_skip`, `silent_skip`, `overwrite`, and `silent_overwrite`.
Returns:
tuple: The result `(stdout, stderr)` of the submission call. These two strings
can contain the output from the actual scripts that is run when `log_folder`
is `None`.
"""
from jinja2 import Template
logger = logging.getLogger("modelrunner.submit_job")
# prepare job configuration
configuration = get_config(config)
if kwargs:
# deprecated since 2024-01-03
warnings.warn("kwargs are deprecated. Use `config` instead", DeprecationWarning)
for k, v in kwargs.items():
configuration[k] = v
if configuration["python_bin"] == "sys.executable":
configuration["python_bin"] = sys.executable
# determine the submission method
if method == "auto":
method = configuration["method"]
# load the correct template
if template is None:
template_path = Path(__file__).parent / "templates" / (method + ".jinja")
else:
template_path = Path(template)
logger.info("Load template `%s`", template_path)
with open(template_path) as fp:
script_template = fp.read()
# prepare submission script
script_args: dict[str, Any] = {
"PACKAGE_PATH": Path(__file__).parents[2],
"JOB_NAME": name,
"MODEL_FILE": escape_string(script),
"USE_MODELRUNNER": use_modelrunner,
"CONFIG": configuration,
}
if log_folder is not None:
ensure_directory_exists(log_folder)
script_args["LOG_FOLDER"] = log_folder
# add the parameters to the job arguments
job_args = []
if parameters is not None and len(parameters) > 0:
if isinstance(parameters, dict):
parameters = json.dumps(parameters)
elif not isinstance(parameters, str):
raise TypeError("Parameters need to be given as a string or a dict")
job_args.append(f"--json {escape_string(parameters)}")
logger.debug("Job arguments: `%s`", job_args)
# add the output folder to the job arguments
if output:
output = Path(output)
if output.is_file():
# output is an existing file, so we need to decide what to do with this
if overwrite_strategy == "error":
raise RuntimeError(f"Output file `{output}` already exists")
elif overwrite_strategy == "warn_skip":
warnings.warn(f"Output file `{output}` already exists")
return "", f"Output file `{output}` already exists" # do nothing
elif overwrite_strategy == "silent_skip":
return "", f"Output file `{output}` already exists" # do nothing
elif overwrite_strategy == "overwrite":
warnings.warn(f"Output file `{output}` will be overwritten")
elif overwrite_strategy == "silent_overwrite":
pass
else:
raise NotImplementedError(f"Unknown strategy `{overwrite_strategy}`")
# delete old output
output.unlink()
# check whether output points to a directory or whether this should be a file
if output.is_dir():
script_args["OUTPUT_FOLDER"] = shlex.quote(str(output))
else:
script_args["OUTPUT_FOLDER"] = shlex.quote(str(output.parent))
job_args.append(f"--output {escape_string(output)}")
else:
# if `output` is not specified, save data to current directory
script_args["OUTPUT_FOLDER"] = "."
script_args["JOB_ARGS"] = " ".join(job_args)
# replace parameters in submission script template
script_content = Template(script_template).render(script_args)
logger.debug("Script: `%s`", script_content)
if method in {"qsub", "srun"}:
# submit job to queue
proc = sp.Popen(
[method],
stdin=sp.PIPE,
stdout=sp.PIPE,
stderr=sp.PIPE,
universal_newlines=True,
)
elif method == "foreground":
# run job locally in the foreground, blocking further calls
proc = sp.Popen(
["bash"],
stdin=sp.PIPE,
stdout=sp.PIPE,
stderr=sp.PIPE,
universal_newlines=True,
bufsize=0, # write output immediately
)
elif method == "background":
# run job locally in the background
proc = sp.Popen(
["bash"],
stdin=sp.PIPE,
stdout=sp.PIPE,
stderr=sp.PIPE,
universal_newlines=True,
)
else:
raise ValueError(f"Unknown submit method `{method}`")
return proc.communicate(script_content)
[docs]def submit_jobs(
script: str | Path,
output_folder: str | Path,
name_base: str = "job",
parameters: str | dict[str, Any] | None = None,
config: str | dict[str, Any] | None = None,
*,
output_format: str = "hdf",
list_params: Iterable[str] | None = None,
**kwargs,
) -> int:
"""submit many jobs of the same script with different parameters to the cluster
Args:
script (str of :class:`~pathlib.Path`):
Path to the script file, which contains the model
output_folder (str of :class:`~pathlib.Path`):
Path to the output folder, where all the results are saved
name_base (str):
Base name of the job. An automatic name is generated on this basis.
parameters (str or dict):
Parameters for the script, either as a python dictionary or a string
containing a JSON-encoded dictionary. All combinations of parameter values
that are iterable and not strings and not part of `keep_list` are submitted
as separate jobs.
config (str or dict):
Configuration for the job, which determines how the job is run. Can be either
a python dictionary or a string containing a JSON-encoded dictionary.
output_format (str):
File extension determining the output format
list_params (list):
List of parameters that are meant to be lists. They will be submitted as
individual parameters and not iterated over to produce multiple jobs.
**kwargs:
All additional parameters are forwarded to :func:`submit_job`.
Returns:
int: The number of jobs that have been submitted
"""
if parameters is None:
parameter_dict = {}
elif isinstance(parameters, str):
parameter_dict = json.loads(parameters)
else:
parameter_dict = parameters
if list_params is None:
list_params = set()
# detect varying parameters
params, p_vary = {}, {}
for name, value in parameter_dict.items():
if (
hasattr(value, "__iter__")
and not isinstance(value, str)
and name not in list_params
):
p_vary[name] = value
else:
params[name] = value
# build the list of all varying arguments
p_vary_list = [
dict(zip(p_vary.keys(), values))
for values in itertools.product(*p_vary.values())
]
if not output_format.startswith("."):
output_format = "." + output_format
# submit jobs with all parameter variations
for p_job in tqdm(p_vary_list):
params.update(p_job)
name = get_job_name(name_base, p_job)
output = Path(output_folder) / f"{name}{output_format}"
submit_job(
script, output=output, name=name, parameters=params, config=config, **kwargs
)
return len(p_vary_list)