import os
import re
import stat
import subprocess
from pathlib import Path
from textwrap import dedent, indent
from typing import Any, Sequence
from . import __version__
from .format import format_with_config
from .templates import (
JOB_INTERACTIVE_TEMPLATE,
JOB_SCRIPT_TEMPLATE,
VARS_TEMPLATE,
)
__all__ = [
"create_job_dir",
"create_job_interactive_script_source",
"create_job_script_source",
"submit_job",
"submit_job_dir",
]
[docs]def submit_job(
job_dir: str,
*,
src: Sequence[str] = (),
on_run: str = "",
on_run_resume: str = "",
setup: str = "",
setup_resume: str = "",
teardown: str = "",
sbatch_options: dict[str, Any] = {},
cleanup_seconds: int = 120,
submit: bool = True,
interactive: bool = False,
resubmit_limit: int = 64,
config: dict[str, Any] = {},
) -> str:
"""Submits job.
Creates job directory with frozen src and submits job to slurm.
Args:
job_dir (str):
Path to directory to keep all job files including
``src.tar`` and auto-generated ``job.sh``.
src (list[str]):
Path to directories containing only source code.
These will be archived in ``$JOB_DIR/src.tar`` and
extracted during job run into ``$SLURM_TMPDIR``.
on_run (str):
Bash code executed in "on_run" stage, but only for new jobs
that are running for the first time.
Must be a single command only.
Optionally, the command may gracefully handle interrupts.
on_run_resume (str):
Bash code executed in "on_run" stage, but only for jobs that
are resuming from previous incomplete runs.
Must be a single command only.
Optionally, the command may gracefully handle interrupts.
setup (str):
Bash code executed in "setup" stage, but only for new jobs
that are running for the first time.
setup_resume (str):
Bash code executed in "setup" stage, but only for jobs that
are resuming from previous incomplete runs.
To reuse the code from ``setup``, simply set this to
``"setup"``, which calls the code inside the ``setup``
function.
teardown (str):
Bash code executed in "teardown" stage.
sbatch_options (dict[str, Any]):
Dictionary of options to pass to sbatch.
cleanup_seconds (int):
Interrupts a job n seconds before timeout to run cleanup
tasks (teardown, auto-schedule new job).
Default is 120 seconds.
submit (bool):
Submit created job to scheduler. Set this to ``False`` if
you are manually submitting the created ``$JOB_DIR`` later.
Default is ``True``.
interactive (bool):
Run as a blocking interactive job. Default is ``False``.
resubmit_limit (int):
Maximum number of times to auto-submit a job for "resume".
(Not entirely unlike submitting a resume for a job.)
Default is 64 resubmissions.
config (dict[str, Any]):
A dictionary of configuration values to use for formatting.
Returns:
Path to the newly created job directory.
"""
job_name = format_with_config(
sbatch_options.get("job-name", "untitled"), config
)
job_dir = _expand_path(
format_with_config(job_dir, {**config, "job_name": job_name})
)
create_job_dir(job_dir, src)
_write_script(
filename=f"{job_dir}/job.sh",
text=create_job_script_source(
sbatch_options=sbatch_options,
on_run=on_run,
on_run_resume=on_run_resume,
setup=setup,
setup_resume=setup_resume,
teardown=teardown,
job_dir=job_dir,
cleanup_seconds=cleanup_seconds,
resubmit_limit=resubmit_limit,
),
)
_write_script(
filename=f"{job_dir}/job_interactive.sh",
text=create_job_interactive_script_source(
sbatch_options=sbatch_options,
job_path=f"{job_dir}/job.sh",
job_dir=job_dir,
cleanup_seconds=cleanup_seconds,
),
)
if submit:
submit_job_dir(job_dir, interactive)
return job_dir
[docs]def create_job_script_source(
sbatch_options: dict[str, Any],
on_run: str,
on_run_resume: str,
setup: str,
setup_resume: str,
teardown: str,
job_dir: str,
cleanup_seconds: int,
resubmit_limit: int,
) -> str:
"""Returns source for job script."""
job_dir = _expand_path(job_dir)
fix_quotes = lambda x: _quote_single_quotes(x.strip())
return JOB_SCRIPT_TEMPLATE.format(
sbatch_options_str=_sbatch_options_to_str(
sbatch_options, job_dir, cleanup_seconds
),
vars_str=VARS_TEMPLATE.format(
easy_slurm_version=__version__,
job_dir=job_dir,
resubmit_limit=resubmit_limit,
),
on_run=fix_quotes(on_run),
on_run_resume=fix_quotes(on_run_resume),
setup=_fix_indent(setup, 1),
setup_resume=_fix_indent(setup_resume, 1),
teardown=_fix_indent(teardown, 1),
)
[docs]def create_job_interactive_script_source(
sbatch_options: dict[str, Any],
job_dir: str,
job_path: str,
cleanup_seconds: int,
) -> str:
"""Returns source for interactive job script."""
job_dir = _expand_path(job_dir)
job_path = _expand_path(job_path)
return JOB_INTERACTIVE_TEMPLATE.format(
sbatch_options_str=_sbatch_options_to_str_interactive(
sbatch_options, job_dir, cleanup_seconds
),
job_path=job_path,
)
[docs]def create_job_dir(job_dir: str, src: Sequence[str]):
"""Creates job directory and freezes all necessary files."""
job_dir = _expand_path(job_dir)
src = [_expand_path(x) for x in src]
os.makedirs(job_dir, exist_ok=True)
_create_tar_dir(src, f"{job_dir}/src.tar.gz")
with open(f"{job_dir}/status", "w") as f:
print("status=new", file=f)
print(f"easy_slurm_version={__version__}", file=f)
print("resubmit_count=0", file=f)
[docs]def submit_job_dir(job_dir: str, interactive: bool):
"""Submits a ``$JOB_DIR`` created by easy_slurm to slurm.
Note that ``submit_job`` already does this for the user,
except when it is called with ``submit=False``.
"""
if interactive:
job_interactive_path = f"{job_dir}/job_interactive.sh"
cmd = [job_interactive_path]
subprocess.run(cmd, check=True, text=True)
return
job_path = f"{job_dir}/job.sh"
cmd = ["sbatch", job_path]
result = subprocess.run(cmd, check=True, capture_output=True, text=True)
m = re.match(r"^Submitted batch job (\d+)$", result.stdout)
job_id = int(m.group(1))
with open(f"{job_dir}/job_ids", "w") as f:
print(job_id, file=f)
def _expand_path(path: str) -> str:
return "" if path == "" else os.path.abspath(os.path.expandvars(path))
def _create_tar_dir(src, dst, root_name=None):
if not src:
src_args = ["-T", "/dev/null"]
else:
src_args = [
arg
for srcdir in src
for arg in ["-C", Path(srcdir).parent, Path(srcdir).name]
]
cmd = ["tar", "czf", dst, *src_args]
if root_name is not None:
cmd.extend(["--transform", rf"s/^/{root_name}\//"])
subprocess.run(cmd, check=True)
def _write_script(filename: str, text: str):
with open(filename, "w") as f:
print(text, file=f)
st = os.stat(filename)
os.chmod(filename, st.st_mode | stat.S_IEXEC)
def _sbatch_options_to_str(
sbatch_options: dict[str, Any], job_dir: str, cleanup_seconds: int
) -> str:
sbatch_options = {
**sbatch_options,
"output": f"{job_dir}/slurm_jobid%j_%x.out",
"signal": f"B:USR1@{cleanup_seconds}", # send USR1 to Bash before job end time
}
return "\n".join(f"#SBATCH --{k}={v}" for k, v in sbatch_options.items())
def _sbatch_options_to_str_interactive(
sbatch_options: dict[str, Any], job_dir: str, cleanup_seconds: int
) -> str:
sbatch_options = {
**sbatch_options,
"signal": f":USR1@{cleanup_seconds}", # send USR1 to Bash before job end time
}
return "\n".join(f" --{k}={v}" for k, v in sbatch_options.items())
def _quote_single_quotes(s: str) -> str:
"""Replaces ' with '"'"'."""
return s.replace("'", """'"'"'""")
def _fix_indent(x: str, level: int = 0) -> str:
return indent(dedent(x.strip("\n")), " " * level).rstrip("\n")