Source code for rl_salamandra_alignment.generate_scripts

"""Tools for generating scripts by filling out templates"""
import yaml
import os
import json
from typing import Union
from copy import deepcopy
from datetime import datetime
from rl_salamandra_alignment.distributed_configs import get_distributed_config_path
from rl_salamandra_alignment.trl_scripts import get_script_path
from rl_salamandra_alignment import templates
from rl_salamandra_alignment import logger
from rl_salamandra_alignment.utils.general import (
    unfold_dict
)
from rl_salamandra_alignment import logger


[docs] def generate_slurm_preamble(sbatch_args: dict) -> str: """Generate the preamble for a slurm job. Args: sbatch_args (dict): Arguments for Sbatch Returns: str: Preamble with all #SBATCHs filled """ slurm_preamble = "" # Always ask for 4 gpus sbatch_args["gres"] = "gpu:4" sbatch_args["cpus-per-task"] = 80 # Fill "#SBATCH" arguments for arg_name, arg_value in sbatch_args.items(): slurm_preamble += f"#SBATCH --{arg_name}={arg_value}\n" slurm_preamble += "\n"*2 return slurm_preamble
[docs] def replace_in_template( text_template: str, variable_name: str, variable_value: Union[str, int, float], ) -> str: """Inside a template, replaces a variable name "{{MY_VAR}}" by a value Args: text_template (str): Template variable_name (str): Name of the variable in all caps. variable_value (Union[str, int, float]): Value of the variable Returns: str: template with the filled slot """ return text_template.replace( r"{{"+variable_name+r"}}", variable_value )
[docs] def get_output_dir(config: dict) -> str: """Extract the field 'output directory' from a config dict, making sure it is a string Args: config (dict): execution config dict for experiment Raises: ValueError: Raised if the value is not a string (e.g. a list). Returns: str: Path to the output directory """ try: output_dir = config["execution"]["output_dir"] if not isinstance(output_dir, str): logger.warning(f"Found 'output_dir of type {type(output_dir)}") raise ValueError("'output_dir' must be a string") return output_dir except: logger.warning( f"'output_dir' must be specified in your config file under 'execution'")
[docs] def setup_macro_output_dir_tree(output_dir: str) -> None: """Construct the directory tree for running an experiment. Args: output_dir (str): root directory for the outputs of the experiment. """ os.makedirs(output_dir, exist_ok=True) subdirs = [ "slurm_scripts", "slurm_logs", "wandb", "cache", "training", "configs", "evaluation" ] for subdir in subdirs: os.makedirs(os.path.join(output_dir, subdir), exist_ok=True) return
def _internal_dir_paths(output_dir: str, id: str) -> dict: """Generate paths to directories used for the outputs of a subexperiment. Args: output_dir (str): root directory for the outputs of the experiment. id (str): Sub-experiment id Returns: dict: Dictionary with the paths to the directories for the subexperiment """ d = { k: os.path.join(output_dir, k, id) for k in ["wandb", "cache", "training", "evaluation"] } return d def _internal_file_paths(output_dir: str, id: str): """Generate paths to directories used for the outputs of a subexperiment. Args: output_dir (str): root directory for the outputs of the experiment. id (str): Sub-experiment id Returns: dict: Dictionary with the paths to the files for the subexperiment """ d = { "slrum_training_distributed_run": os.path.join(output_dir, "slurm_scripts", f"distributed_run_{id}.job"), "slurm_training_launch": os.path.join(output_dir, "slurm_scripts", f'launch_{id}.sh'), "slurm_training_output": os.path.join(output_dir, "slurm_logs", id, f"%j_%x_training.log"), "slurm_training_error": os.path.join(output_dir, "slurm_logs", id, f"%j_%x_training.err"), "slurm_eval_harness_job": os.path.join(output_dir, "slurm_scripts", f"eval_harness_{id}.job"), "slurm_eval_harness_output": os.path.join(output_dir, "slurm_logs", id, f"%j_%x_eval_harness.log"), "slurm_eval_harness_error": os.path.join(output_dir, "slurm_logs", id, f"%j_%x_eval_harness.err"), "slurm_eval_local_job": os.path.join(output_dir, "slurm_scripts", f"eval_local_{id}.job"), "slurm_eval_local_output": os.path.join(output_dir, "slurm_logs", id, f"%j_%x_eval_local.log"), "slurm_eval_local_error": os.path.join(output_dir, "slurm_logs", id, f"%j_%x_eval_local.err"), "config": os.path.join(output_dir, "configs", f"config_{id}.yaml") } return d
[docs] def setup_micro_output_dir_tree( output_dir: str, config: dict, id: str ) -> None: """Construct the directory tree for running a subexperiment. Args: output_dir (str): root directory for the outputs of the experiment. config (dict): execution config dict for subexperiment id (str): Sub-experiment id """ # slurm scripts -> handled by 'generate_*_script' functions # slurm logs -> handled by 'generate_*_script' functions internal_dir_paths = _internal_dir_paths(output_dir, id) # wandb : os.makedirs( internal_dir_paths["wandb"], exist_ok=True ) # cache os.makedirs( internal_dir_paths["cache"], exist_ok=True ) # training os.makedirs( internal_dir_paths["training"], exist_ok=True ) # evaluation os.makedirs( internal_dir_paths["evaluation"], exist_ok=True ) # configs internal_file_paths = _internal_file_paths(output_dir, id) with open( internal_file_paths["config"], "w", encoding="utf-8" ) as f: yaml.dump(config, f, default_flow_style=False, allow_unicode=True)
[docs] def generate_distributed_run_script( output_dir: str, config: dict, id: str, ) -> str: """Write the script for distributed execution of a subexperiment, by filling out the script template. Args: output_dir (str): root directory for the outputs of the experiment. config (dict): execution config dict for subexperiment id (str): Sub-experiment id Returns: str: Text of the script for distributed execution """ filled_template = templates.get_distributed_run_template() internal_file_paths = _internal_file_paths(output_dir, id) # ============ # Slurm preamble # ============ # Automatically determine log file slurm_config = deepcopy(config["slurm"]) output_dir = get_output_dir(config) slurm_config["job-name"] = f"{id}_train_" + slurm_config["job-name"] slurm_config["output"] = internal_file_paths["slurm_training_output"] slurm_config["error"] = internal_file_paths["slurm_training_error"] slurm_preamble = generate_slurm_preamble(slurm_config) filled_template = replace_in_template( filled_template, "SBATCH_PARAMETERS", slurm_preamble ) # fill the Launch script launch_script_path = internal_file_paths["slurm_training_launch"] filled_template = replace_in_template( filled_template, "LAUNCH_SCRIPT", launch_script_path ) return filled_template
[docs] def get_script_args_string(script_args_dict: dict) -> str: """Convert a python dictionary into a bash dictionary Args: script_args_dict (dict): python dictionary to convert Returns: str: bash dictionary """ script_args_string = [ f"\t--{k} {v}" for k, v in script_args_dict.items() ] return "\n".join(script_args_string)
[docs] def generate_launch_script( output_dir: str, config: dict, id: str ) -> str: """Write the script for launching a subexperiment, by filling out the script template. Args: output_dir (str): root directory for the outputs of the experiment. config (dict): execution config dict for subexperiment id (str): Sub-experiment id Returns: str: Text of the script for launching the subexperiment """ filled_template = templates.get_launch_script_template() internal_file_paths = _internal_file_paths(output_dir, id) internal_dir_paths = _internal_dir_paths(output_dir, id) # ============ # Environment variables # ============ environment_dict = deepcopy(config["environment"]) # Automatically determine WANDB_DIR environment_dict["WANDB_DIR"] = internal_dir_paths["wandb"] # Automatically determine dir for training (checkpoints) environment_dict["TRAINING_OUTPUT_DIR"] = internal_dir_paths["training"] # original model dir environment_dict["ORIGINAL_MODEL_PATH"] = config["model_config_args"]["model_name_or_path"] # venv venv_dir = config["execution"]["venv"] environment_dict["VENV_DIR"] = venv_dir ## handle python 3.12 venvs (the name must contain "python" and "3.12") if "python" in venv_dir and ("3.12" in venv_dir or "312" in venv_dir): # load modules filled_template = replace_in_template( filled_template, "LOAD_MODULES", "module load impi intel hdf5 mkl cuda/12.6 python/3.12.1-gcc" ) # setup the pythonpath filled_template = replace_in_template( filled_template, "SET_PYTHONPATH", "export PYTHONPATH=\"$VENV_DIR/lib/python3.12/site-packages\"" ) else: filled_template = replace_in_template( filled_template, "LOAD_MODULES", "" ) filled_template = replace_in_template( filled_template, "SET_PYTHONPATH", "" ) # Generate export statements: environment_variables = [ f'export {k}="{v}"' for k, v in environment_dict.items() ] environment_variables = "\n".join(environment_variables) filled_template = replace_in_template( filled_template, "ENVIRONMENT_VARIABLES", environment_variables ) # ============ # RL Dataset # ============ rl_dataset_path = config["rl_script_args"]["dataset_name"] # The following line is to avoid using the original path to the RL dataset. # Remember that the RL dataset is converted to a format that can be locally run in MN5, # and this conversion happens before launching the fine-tuning config["rl_script_args"]["dataset_name"] = "$RL_DATASET_PATH" filled_template = replace_in_template( filled_template, "RL_DATASET_PATH", rl_dataset_path ) # ============ # Cache # ============ filled_template = replace_in_template( filled_template, "CACHE_DIR", internal_dir_paths["cache"] ) # ============ # Distributed config # ============ ds_config_path = get_distributed_config_path( config["execution"]["distributed_config"] ) filled_template = replace_in_template( filled_template, "DS_CONFIG_PATH", ds_config_path ) # ============ # RL script args # ============ rl_script_args = deepcopy(config["rl_script_args"]) filled_template = replace_in_template( filled_template, "RL_SCRIPT_ARGS", get_script_args_string(rl_script_args) ) # ============ # RL config args # ============ rl_config_args = deepcopy(config["rl_config_args"]) # Automatically determine logging dir for training rl_config_args["logging_dir"] = "$TRAINING_OUTPUT_DIR/logs" # This is very important for distributed distribution rl_config_args["local_rank"] = "$SLURM_LOCALID" # Name for WanDB syncing rl_config_args["run_name"] = "$WANDB_NAME" filled_template = replace_in_template( filled_template, "RL_CONFIG_ARGS", get_script_args_string(rl_config_args) ) # ============ # Model args # ============ model_config_args = deepcopy(config["model_config_args"]) # automaticall determine path to original model model_config_args["model_name_or_path"] = "$ORIGINAL_MODEL_PATH" # automatically determine training dir: model_config_args["output_dir"] = "$TRAINING_OUTPUT_DIR" filled_template = replace_in_template( filled_template, "MODEL_CONFIG_ARGS", get_script_args_string(model_config_args) ) # ============ # RL script path # ============ rl_script_path = get_script_path(config["execution"]["algorithm"]) filled_template = replace_in_template( filled_template, "RL_SCRIPT_PATH", rl_script_path ) return filled_template
[docs] def generate_one_training_job(output_dir: str, config: dict, id: str) -> dict: """Generate the TRAINING slurm scripts for a subexperiment Args: output_dir (str): root directory for the outputs of the experiment. config (dict): execution config dict for subexperiment id (str): Sub-experiment id Returns: dict: paths to the distributed execution script and the launching script """ # Generate the content of the scripts launch_script_string = generate_launch_script(output_dir, config, id) distributed_run_script_string = generate_distributed_run_script( output_dir, config, id, ) # Save the content to file internal_file_paths = _internal_file_paths(output_dir, id) with open( internal_file_paths["slrum_training_distributed_run"], "w" ) as f: f.write(distributed_run_script_string) with open( internal_file_paths["slurm_training_launch"], "w" ) as f: f.write(launch_script_string) return { "slrum_training_distributed_run": internal_file_paths["slrum_training_distributed_run"], "slurm_training_launch": internal_file_paths["slurm_training_launch"] }
[docs] def generate_local_eval_script(output_dir: str, config: dict, id: str) -> dict: """Generate the LOCAL evaluation slurm scripts for a subexperiment (for example, red teaming) Args: output_dir (str): root directory for the outputs of the experiment. config (dict): execution config dict for subexperiment id (str): Sub-experiment id Returns: dict: paths to the local evaluation scripts """ # TODO return ""
[docs] def generate_harness_eval_script(output_dir: str, config: dict, id: str) -> dict: """Generate the HARNESS evaluation slurm scripts for a subexperiment Args: output_dir (str): root directory for the outputs of the experiment. config (dict): execution config dict for subexperiment id (str): Sub-experiment id Returns: dict: paths to the harness evaluation scripts """ filled_template = templates.get_harness_template() internal_file_paths = _internal_file_paths(output_dir, id) internal_dir_paths = _internal_dir_paths(output_dir, id) # ============ # Slurm preamble # ============ # Keep the same job name for eval and training (different prefixes) job_name = config["slurm"]["job-name"] # First look for specific slurm requirements for harness eval, # if not found, use the same slurm requirements from training. if "harness_slurm" in config.get("evaluation", ()): slurm_config = deepcopy(config["evaluation"]["harness_slurm"]) else: logger.warning( f"Did not find custom SLURM requirements for Harness Evaluation.\n" "You can specify them in your config, under 'evaluation'.'harness_slurm'\n" "Defaulting to the same SLURM requirements used for training." ) slurm_config = deepcopy(config["slurm"]) # Automatically determine log files and jobname slurm_config["job-name"] = f"{id}_eval_harn_" + job_name slurm_config["output"] = internal_file_paths["slurm_eval_harness_output"] slurm_config["error"] = internal_file_paths["slurm_eval_harness_error"] slurm_preamble = generate_slurm_preamble(slurm_config) filled_template = replace_in_template( filled_template, "SBATCH_PARAMETERS", slurm_preamble ) # ============ # Model info # ============ # Get the trained model model_path = internal_dir_paths["training"] model_name = config["model_config_args"]["model_name_or_path"] model_name = f"{id}_{os.path.basename(model_name)}" filled_template = replace_in_template( filled_template, "MODEL_PATH", model_path ) filled_template = replace_in_template( filled_template, "MODEL_NAME", model_name ) # ============ # Task list # ============ task_list = config["evaluation"]["harness_tasks"] task_list = ",".join(task_list) filled_template = replace_in_template( filled_template, "TASK_LIST", task_list ) # ============ # Harness output_path # ============ harness_output_path = os.path.join( internal_dir_paths["evaluation"], "harness" ) filled_template = replace_in_template( filled_template, "HARNESS_EVAL_OUTPUT_PATH", harness_output_path ) # ============ # Cache path # ============ filled_template = replace_in_template( filled_template, "CACHE_DIR", internal_dir_paths["cache"] ) return filled_template
[docs] def generate_eval_scripts_for_one_training(output_dir: str, config: dict, id: str) -> dict: """Generate the EVALUATION slurm scripts for a subexperiment Args: output_dir (str): root directory for the outputs of the experiment. config (dict): execution config dict for subexperiment id (str): Sub-experiment id Returns: dict: paths to the evaluation scripts """ # First generate harness evaluation script harness_eval_script = generate_harness_eval_script(output_dir, config, id) # Then generate local evaluation scripts local_eval_script = generate_local_eval_script(output_dir, config, id) # Save the content to file internal_file_paths = _internal_file_paths(output_dir, id) with open( internal_file_paths["slurm_eval_harness_job"], "w" ) as f: f.write(harness_eval_script) with open( internal_file_paths["slurm_eval_local_job"], "w" ) as f: f.write(local_eval_script) return { "slurm_eval_harness_job": internal_file_paths["slurm_eval_harness_job"], "slurm_eval_local_job": internal_file_paths["slurm_eval_local_job"] }
[docs] def generate_one_job_set(output_dir: str, config: dict, id: str) -> tuple: """Generate the slurm scripts for a subexperiment (both training and evaluation) Args: output_dir (str): root directory for the outputs of the experiment. config (dict): execution config dict for subexperiment id (str): Sub-experiment id Returns: dict: paths to the scripts for training and evaluation """ job_set_paths = {} # First generate training job scripts training_scripts = generate_one_training_job(output_dir, config, id) job_set_paths.update(training_scripts) # Then generate evaluation job scripts if config.get("evaluation"): evaluation_scripts = generate_eval_scripts_for_one_training( output_dir, config, id) job_set_paths.update(evaluation_scripts) return job_set_paths
[docs] def get_config_ids(config_list: list) -> list: """Give an unique ID to each config. Each ID corresponds to a subexperiment. Args: config_list (list): List of config dicts Returns: list: List of tuples (id, config) """ unique_timestamp = datetime.now().strftime("%Y_%m_%d_%H_%M") configs_with_id = [] for i, config in enumerate(config_list): id = f"{str(i).zfill(2)}__{unique_timestamp}" configs_with_id.append( (id, config) ) return configs_with_id
[docs] def generate_all_job_files(config: dict) -> list[tuple]: """Generate the slurm scripts for an experiment Args: config (dict): execution config dict for experiment Returns: list[tuple]: for each subexperiment, paths to the distributed execution script and the launching script """ output_dir = get_output_dir(config) # build the tree structure setup_macro_output_dir_tree(output_dir) # Generate the configs for all the subexperiments unfolded_configs = unfold_dict(config) # Give an ID to each subexperiment config unfolded_configs_with_id = get_config_ids(unfolded_configs) # build the tree structure for each subexperiment for id, cfg in unfolded_configs_with_id: setup_micro_output_dir_tree(output_dir, cfg, id) # Generate all scripts return [ generate_one_job_set(output_dir, cfg, id) for id, cfg in unfolded_configs_with_id ]