Source code for opennmt.runner

"""Main library entrypoint."""

import copy
import math
import os
import random
import shutil
import subprocess
import sys
import tempfile

import numpy as np
import tensorflow as tf
import yaml

from opennmt import config as config_util
from opennmt import evaluation, inference, models
from opennmt import training as training_util
from opennmt.config import MODEL_DESCRIPTION_FILENAME
from opennmt.utils import checkpoint as checkpoint_util
from opennmt.utils import misc
from opennmt.version import __version__

# These options require a value but we can fallback to a default one.
_CONFIG_FALLBACK = {
    "params": {},
    "train": {
        "batch_type": "examples",
        "length_bucket_width": 2,
        "sample_buffer_size": 500000,
        "save_summary_steps": 100,
    },
    "eval": {
        "length_bucket_width": None,
        "batch_type": "examples",
        "batch_size": 32,
    },
    "infer": {
        "length_bucket_width": None,
        "batch_type": "examples",
        "batch_size": 16,
    },
    "score": {
        "length_bucket_width": None,
        "batch_type": "examples",
        "batch_size": 64,
    },
}


[docs]class Runner: """Class for running and exporting models."""
[docs] def __init__( self, model, config, auto_config=None, mixed_precision=False, jit_compile=False, seed=None, ): """Initializes the runner parameters. Args: model: A :class:`opennmt.models.Model` instance to run or a callable that returns such instance. config: The run configuration. auto_config: If ``True``, use automatic configuration values defined by :obj:`model`. If not set, the parameter is read from the run configuration. mixed_precision: Enable mixed precision. jit_compile: Compile the model with XLA when possible. seed: The random seed to set. Raises: TypeError: if :obj:`model` is not a :class:`opennmt.models.Model` instance or a callable. """ if isinstance(model, models.Model): self._model = model self._model_fn = lambda: misc.clone_layer(model) elif callable(model): self._model = model() self._model_fn = model else: raise TypeError( "model should be a opennmt.models.Model instance or a callable" ) tf.get_logger().info("Using OpenNMT-tf version %s", __version__) tf.get_logger().info("Using model:\n%s", self._model) self._optimizer = None self._config = copy.deepcopy(config) self._config["data"] = config_util.try_prefix_paths( self._config["model_dir"], config["data"] ) if auto_config is None: auto_config = self._config.get("auto_config", False) self._auto_config = auto_config self._mixed_precision = mixed_precision self._jit_compile = jit_compile if seed is not None: np.random.seed(seed) random.seed(seed) tf.random.set_seed(seed)
@property def model(self): """The :class:`opennmt.models.Model` executed by this runner.""" return self._model @property def model_dir(self): """The active model directory.""" return self._config["model_dir"] def _finalize_config(self, training=False, num_replicas=1, num_devices=1): # Configuration priority: user config > auto config > default config. config = copy.deepcopy(_CONFIG_FALLBACK) if self._auto_config: model_config = self._model.auto_config(num_replicas=num_replicas) if not model_config: raise NotImplementedError( "This model does not define any automatic configuration values" ) config_util.merge_config(config, model_config) config_util.merge_config(config, self._config) config["params"].setdefault("num_hypotheses", config["infer"].get("n_best", 1)) config["params"].setdefault( "average_loss_in_time", config["train"]["batch_type"] == "tokens" ) if training: train_config = config["train"] batch_size = train_config.get("batch_size") # Auto tune batch size. if batch_size is None or batch_size == 0: if train_config["batch_type"] == "examples": min_batch_size = 1 max_batch_size = 512 min_range = 16 else: min_batch_size = 256 max_batch_size = 16384 min_range = 256 if train_config.get("effective_batch_size") is not None: max_batch_size = min( max_batch_size, train_config["effective_batch_size"] ) train_config["batch_size"] = _auto_tune_batch_size( config, min_batch_size=min_batch_size, max_batch_size=max_batch_size, min_range=min_range, num_devices=num_devices, scaling_factor=train_config.get("batch_size_autotune_scale", 0.7), mixed_precision=self._mixed_precision, timeout=train_config.get("batch_size_autotune_timeout", 15 * 60), ) tf.get_logger().info( "Using parameters:\n%s", yaml.dump(config, indent=2, default_flow_style=False), ) return config def _init_model(self, config): model = self._model_fn() model.initialize(config["data"], params=config["params"]) model.set_jit_compile(self._jit_compile) return model
[docs] def train( self, num_devices=1, with_eval=False, checkpoint_path=None, hvd=None, return_summary=False, fallback_to_cpu=True, continue_from_checkpoint=False, ): """Runs the training loop. Args: num_devices: Number of devices to use for training. with_eval: Enable evaluation during training. checkpoint_path: The checkpoint path to load the model weights from. hvd: Optional Horovod module. return_summary: Return a summary of the training from this function. fallback_to_cpu: If no GPU is detected, allow the training to run on CPU. continue_from_checkpoint: Continue training from the checkpoint passed to :obj:`checkpoint_path`. Otherwise only the model weights are loaded. Returns: The path to the final model directory and, if :obj:`return_summary` is set, a dictionary with various training statistics. """ if hvd is None: num_replicas = num_devices is_master = True else: if num_devices > 1: raise ValueError( "num_devices (or num_gpus) should be set to 1 when using Horovod" ) num_replicas = hvd.size() is_master = hvd.rank() == 0 devices = misc.get_devices(count=num_devices, fallback_to_cpu=fallback_to_cpu) config = self._finalize_config( training=True, num_replicas=num_replicas, num_devices=num_devices ) mixed_precision = self._mixed_precision and misc.enable_mixed_precision() model = self._init_model(config) optimizer = model.get_optimizer() data_config = config["data"] train_config = config["train"] eval_config = config["eval"] batch_type = train_config["batch_type"] batch_size = train_config["batch_size"] batch_size_multiple = ( 8 if batch_type == "tokens" and (mixed_precision or self._jit_compile) else 1 ) batch_autotune_mode = train_config.get("batch_autotune_mode") length_bucket_width = train_config["length_bucket_width"] pad_to_bucket_boundary = train_config.get("pad_to_bucket_boundary") if self._jit_compile: length_bucket_width = max(length_bucket_width, batch_size_multiple) pad_to_bucket_boundary = True dataset_fn = ( lambda input_context: model.examples_inputter.make_training_dataset( data_config["train_features_file"], data_config.get("train_labels_file"), batch_size, batch_type=batch_type, batch_size_multiple=batch_size_multiple, shuffle_buffer_size=train_config["sample_buffer_size"], length_bucket_width=length_bucket_width, pad_to_bucket_boundary=pad_to_bucket_boundary, maximum_features_length=train_config.get("maximum_features_length"), maximum_labels_length=train_config.get("maximum_labels_length"), single_pass=train_config.get("single_pass", False), num_shards=input_context.num_input_pipelines, shard_index=input_context.input_pipeline_id, prefetch_buffer_size=train_config.get("prefetch_buffer_size"), cardinality_multiple=input_context.num_replicas_in_sync, weights=data_config.get("train_files_weights"), batch_autotune_mode=batch_autotune_mode, ) ) checkpoint = None evaluator = None if is_master: checkpoint = checkpoint_util.Checkpoint.from_config( config, model, optimizer=optimizer ) checkpoint.restore( checkpoint_path=checkpoint_path, weights_only=( checkpoint_path is not None and not continue_from_checkpoint ), ) if with_eval: evaluator = evaluation.Evaluator.from_config(model, config) # Set gradients accumulation based on the requested effective batch size. effective_batch_size = train_config.get("effective_batch_size") if effective_batch_size is not None: accum_steps = _count_batch_accum( batch_size, effective_batch_size, num_replicas=num_replicas, ) if batch_autotune_mode and accum_steps > 2: # When autotuning the batch size, the memory usage should be the same # whether we are accumulating 2 steps or N steps. accum_steps = 2 effective_batch_size = batch_size * num_replicas * accum_steps tf.get_logger().info( "Accumulate gradients of %d iterations to reach effective batch size of %d", accum_steps, effective_batch_size, ) else: accum_steps = 1 if hvd is not None: trainer = training_util.HorovodTrainer( model, optimizer, hvd, checkpoint=checkpoint ) elif num_devices > 1: trainer = training_util.MirroredStrategyTrainer( model, optimizer, checkpoint=checkpoint, devices=devices ) else: trainer = training_util.Trainer(model, optimizer, checkpoint=checkpoint) summary = trainer( dataset_fn, max_step=train_config.get("max_step"), accum_steps=accum_steps, report_steps=train_config.get("save_summary_steps", 100), save_steps=train_config.get("save_checkpoints_steps", 5000), evaluator=evaluator, eval_steps=eval_config.get("steps", 5000), moving_average_decay=train_config.get("moving_average_decay"), ) average_last_checkpoints = train_config.get("average_last_checkpoints", 0) if checkpoint is None: output_dir = None elif average_last_checkpoints > 0: output_dir = self.average_checkpoints( os.path.join(checkpoint.model_dir, "avg"), max_count=average_last_checkpoints, ) else: output_dir = checkpoint.model_dir if mixed_precision: misc.disable_mixed_precision() if return_summary: return output_dir, summary return output_dir
[docs] def evaluate(self, features_file=None, labels_file=None, checkpoint_path=None): """Runs evaluation. Args: features_file: The input features file to evaluate. If not set, will load ``eval_features_file`` from the data configuration. labels_file: The output labels file to evaluate. If not set, will load ``eval_labels_file`` from the data configuration. checkpoint_path: The checkpoint path to load the model weights from. Returns: A dict of evaluation metrics. """ config = self._finalize_config() model = self._init_model(config) checkpoint = checkpoint_util.Checkpoint.from_config(config, model) checkpoint_path = checkpoint.restore( checkpoint_path=checkpoint_path, weights_only=True ) step = checkpoint_util.get_step_from_checkpoint_prefix(checkpoint_path) evaluator = evaluation.Evaluator.from_config( model, config, features_file=features_file, labels_file=labels_file ) return evaluator(step)
[docs] def average_checkpoints(self, output_dir, max_count=8, checkpoint_paths=None): """Averages checkpoints. Args: output_dir: The directory that will contain the averaged checkpoint. max_count: The maximum number of checkpoints to average. checkpoint_paths: The list of checkpoints to average. If not set, the last :obj:`max_count` checkpoints of the current model directory are averaged. Returns: The path to the directory containing the averaged checkpoint. """ config = self._finalize_config() model = self._init_model(config) optimizer = model.get_optimizer() checkpoint = checkpoint_util.Checkpoint.from_config( config, model, optimizer=optimizer ) checkpoint.restore() model.create_variables(optimizer=optimizer) trackables = dict(model=model, optimizer=optimizer) output_dir = checkpoint_util.average_checkpoints( self.model_dir if checkpoint_paths is None else checkpoint_paths, output_dir, trackables, max_count=max_count, ) _forward_model_description(self.model_dir, output_dir) self._config["model_dir"] = output_dir return output_dir
[docs] def update_vocab(self, output_dir, src_vocab=None, tgt_vocab=None): """Updates model vocabularies. Args: output_dir: Directory where the update checkpoint will be saved. src_vocab: Path to the new source vocabulary. tgt_vocab: Path to the new tagret vocabulary. Returns: Path to the new checkpoint directory. """ if not isinstance(self._model, models.SequenceToSequence): raise ValueError( "Updating vocabularies is only supported for sequence to sequence models" ) config = self._finalize_config() if src_vocab is None and tgt_vocab is None: return config["model_dir"] model = self._init_model(config) optimizer = model.get_optimizer() cur_checkpoint = checkpoint_util.Checkpoint.from_config( config, model, optimizer=optimizer ) cur_checkpoint.restore() model.create_variables(optimizer=optimizer) source_dir = self.model_dir self._config["model_dir"] = output_dir if src_vocab is not None: self._config["data"]["source_vocabulary"] = src_vocab if tgt_vocab is not None: self._config["data"]["target_vocabulary"] = tgt_vocab new_config = self._finalize_config() new_model = self._init_model(new_config) new_optimizer = new_model.get_optimizer() new_checkpoint = checkpoint_util.Checkpoint.from_config( new_config, new_model, optimizer=new_optimizer ) new_model.create_variables(optimizer=new_optimizer) model.transfer_weights( new_model, new_optimizer=new_optimizer, optimizer=optimizer ) new_optimizer.iterations.assign(optimizer.iterations) new_checkpoint.save() _forward_model_description(source_dir, output_dir) return output_dir
[docs] def infer( self, features_file, predictions_file=None, checkpoint_path=None, log_time=False ): """Runs inference. Args: features_file: The file(s) to infer from. predictions_file: If set, predictions are saved in this file, otherwise they are printed on the standard output. checkpoint_path: Path to a specific checkpoint to load. If ``None``, the latest is used. log_time: If ``True``, several time metrics will be printed in the logs at the end of the inference loop. """ config = self._finalize_config() model = self._init_model(config) checkpoint = checkpoint_util.Checkpoint.from_config(config, model) checkpoint.restore(checkpoint_path=checkpoint_path, weights_only=True) infer_config = config["infer"] dataset = model.examples_inputter.make_inference_dataset( features_file, infer_config["batch_size"], batch_type=infer_config["batch_type"], length_bucket_width=infer_config["length_bucket_width"], prefetch_buffer_size=infer_config.get("prefetch_buffer_size"), ) inference.predict_dataset( model, dataset, print_params=infer_config, predictions_file=predictions_file, log_time=log_time, )
[docs] def export(self, export_dir, checkpoint_path=None, exporter=None): """Exports a model. Args: export_dir: The export directory. checkpoint_path: The checkpoint path to export. If ``None``, the latest is used. exporter: A :class:`opennmt.utils.Exporter` instance. Defaults to :class:`opennmt.utils.SavedModelExporter`. """ config = self._finalize_config() model = self._init_model(config) checkpoint = checkpoint_util.Checkpoint.from_config(config, model) path_restored = checkpoint.restore( checkpoint_path=checkpoint_path, weights_only=True ) # If no model found, throw error instead of exporting an untrained model if path_restored is None: raise AttributeError("Checkpoint path to restore was not found") model.export(export_dir, exporter=exporter)
[docs] def score( self, features_file, predictions_file=None, checkpoint_path=None, output_file=None, ): """Scores existing predictions. Args: features_file: The input file. predictions_file: The predictions file to score. checkpoint_path: Path to specific checkpoint to load. If ``None``, the latest is used. output_file: The file where the scores are saved. Otherwise, they will be printed on the standard output. """ config = self._finalize_config() model = self._init_model(config) checkpoint = checkpoint_util.Checkpoint.from_config(config, model) checkpoint.restore(checkpoint_path=checkpoint_path, weights_only=True) score_config = config["score"] dataset = model.examples_inputter.make_evaluation_dataset( features_file, predictions_file, score_config["batch_size"], batch_type=score_config["batch_type"], length_bucket_width=score_config["length_bucket_width"], prefetch_buffer_size=score_config.get("prefetch_buffer_size"), ) inference.score_dataset( model, dataset, print_params=score_config, output_file=output_file )
def _forward_model_description(source, destination): source = os.path.join(source, MODEL_DESCRIPTION_FILENAME) if os.path.isfile(source): if not os.path.isdir(destination): os.makedirs(destination) destination = os.path.join(destination, MODEL_DESCRIPTION_FILENAME) shutil.copyfile(source, destination) def _count_batch_accum(batch_size, target_batch_size, num_replicas=1): """Given the current batch size, the number of replicas, and the requested effective batch size, returns the number of gradients to accumulate. """ return int(math.ceil(float(target_batch_size) / (batch_size * num_replicas))) def _auto_tune_batch_size( config, min_batch_size, max_batch_size, min_range, sample_iterations=5, num_devices=1, scaling_factor=0.7, mixed_precision=False, timeout=15 * 60, ): """Find the largest token-based batch size that can be used with this configuration. This function runs some training iterations and uses out-of-memory errors as search conditions. A binary search is used to converge to a suitable batch size. We prefer to run the iterations in a different process so that it does not alter the current context (OOM may not be safe to recover from, see for example https://stackoverflow.com/q/53820713/2529808). Args: config: The training configuration. min_batch_size: The smallest batch size to consider. max_batch_size: The largest batch size to consider. min_range: Continue searching while the difference between :obj:`max_batch_size` and :obj:`min_batch_size` is larger than this value. sample_iterations: The number of training iterations. num_devices: The number of devices to use. scaling_factor: Scale the found batch size by this value. mixed_precision: If ``True``, run the autotuning with mixed precision. timeout: Consider the training attempt as failed after this many seconds. Returns: The autotuned batch size. """ tf.get_logger().info( "Searching the largest batch size between %d and %d with a precision of %d...", min_batch_size, max_batch_size, min_range, ) model_description = os.path.join(config["model_dir"], MODEL_DESCRIPTION_FILENAME) absolute_min_batch_size = min_batch_size stderr_data = None while max_batch_size - min_batch_size > min_range: batch_size = (max_batch_size + min_batch_size) // 2 with tempfile.TemporaryDirectory() as tmpdir: run_config = copy.deepcopy(config) run_config["model_dir"] = tmpdir run_config["train"]["batch_autotune_mode"] = True run_config["train"]["batch_size"] = batch_size run_config["train"]["save_checkpoints_steps"] = None run_config["train"]["average_last_checkpoints"] = 0 run_config["train"]["max_step"] = sample_iterations config_path = os.path.join(tmpdir, "batch_size_autotuner.yml") with tf.io.gfile.GFile(config_path, mode="w") as config_file: yaml.dump(run_config, config_file) env = os.environ.copy() env["TF_CPP_MIN_LOG_LEVEL"] = "2" args = [ sys.executable or "python", "-m", "opennmt.bin.main", "--log_level", "ERROR", "--config", config_path, "--model", model_description, ] if mixed_precision: args.extend(["--mixed_precision"]) args.extend( [ "train", "--num_gpus", str(num_devices), ] ) tf.get_logger().info("Trying training with batch size %d...", batch_size) with open(os.devnull, "w") as devnull: process = subprocess.Popen( args, stdout=devnull, stderr=subprocess.PIPE, env=env, ) try: _, stderr_data = process.communicate(timeout=timeout) except subprocess.TimeoutExpired: process.kill() tf.get_logger().info("... failed (timeout).") max_batch_size = batch_size - 1 else: if process.returncode != 0: tf.get_logger().info("... failed.") max_batch_size = batch_size - 1 else: tf.get_logger().info( "... succeeded, continue until the search range is smaller than %d.", min_range, ) min_batch_size = batch_size if min_batch_size == absolute_min_batch_size: if stderr_data is not None: tf.get_logger().error( 'Last training attempt exited with an error:\n\n"""\n%s"""\n' % stderr_data.decode("utf-8") ) raise RuntimeError( "Batch size autotuning failed: all training attempts exited with an error " "(see last error above). Either there is not enough memory to train this " "model, or unexpected errors occured. Please try to set a fixed batch size " "in the training configuration." ) batch_size = max(int(scaling_factor * min_batch_size), absolute_min_batch_size) tf.get_logger().info("Batch size auto tuned to %d.", batch_size) return batch_size