"""Define base tokenizers."""

import abc
import json
import sys

import tensorflow as tf
import yaml

from opennmt.utils import misc

[docs]class Tokenizer(abc.ABC): """Base class for tokenizers.""" @property def in_graph(self): """Returns ``True`` if this tokenizer can be run in graph (i.e. uses TensorFlow ops).""" return False
[docs] def export_assets(self, asset_dir, asset_prefix=""): """Exports assets for this tokenizer. Args: asset_dir: The directory where assets can be written. asset_prefix: The prefix to attach to assets filename. Returns: A dictionary containing additional assets used by the tokenizer. """ return {}
[docs] def tokenize_stream( self, input_stream=sys.stdin, output_stream=sys.stdout, delimiter=" ", training=True, ): """Tokenizes a stream of sentences. Args: input_stream: The input stream. output_stream: The output stream. delimiter: The token delimiter to use for text serialization. training: Set to ``False`` to tokenize for inference. """ for line in input_stream: line = line.strip() tokens = self.tokenize(line, training=training) merged_tokens = delimiter.join(tokens) misc.print_as_bytes(merged_tokens, stream=output_stream)
[docs] def detokenize_stream( self, input_stream=sys.stdin, output_stream=sys.stdout, delimiter=" ", ): """Detokenizes a stream of sentences. Args: input_stream: The input stream. output_stream: The output stream. delimiter: The token delimiter used for text serialization. """ for line in input_stream: tokens = line.strip().split(delimiter) string = self.detokenize(tokens) misc.print_as_bytes(string, stream=output_stream)
[docs] def tokenize(self, text, training=True): """Tokenizes text. Args: text: A string or batch of strings to tokenize as a ``tf.Tensor`` or Python values. training: Set to ``False`` to tokenize for inference. Returns: - If :obj:`text` is a Python string, a list of Python strings. - If :obj:`text` is a list of Python strings, a list of list of Python strings. - If :obj:`text` is a 0-D ``tf.Tensor``, a 1-D ``tf.Tensor``. - If :obj:`text` is a 1-D ``tf.Tensor``, a 2-D ``tf.RaggedTensor``. Raises: ValueError: if the rank of :obj:`text` is greater than 1. """ with tf.device("cpu:0"): return self._tokenize(text, training)
def _tokenize(self, text, training): if tf.is_tensor(text): return self._tokenize_tensor(text, training) elif isinstance(text, list): return list(map(lambda t: self._tokenize(t, training), text)) else: text = tf.compat.as_text(text) return self._tokenize_string(text, training)
[docs] def detokenize(self, tokens, sequence_length=None): """Detokenizes tokens. The Tensor version supports batches of tokens. Args: tokens: Tokens or batch of tokens as a ``tf.Tensor``, ``tf.RaggedTensor``, or Python values. sequence_length: The length of each sequence. Required if :obj:`tokens` is a dense 2-D ``tf.Tensor``. Returns: - If :obj:`tokens` is a list of list of Python strings, a list of Python strings. - If :obj:`tokens` is a list of Python strings, a Python string. - If :obj:`tokens` is a N-D ``tf.Tensor`` (or ``tf.RaggedTensor``), a (N-1)-D ``tf.Tensor``. Raises: ValueError: if the rank of :obj:`tokens` is greater than 2. ValueError: if :obj:`tokens` is a 2-D dense ``tf.Tensor`` and :obj:`sequence_length` is not set. """ with tf.device("cpu:0"): return self._detokenize(tokens, sequence_length)
def _detokenize(self, tokens, sequence_length): if isinstance(tokens, tf.RaggedTensor): return self._detokenize_tensor(tokens) elif tf.is_tensor(tokens): rank = len(tokens.shape) if rank == 1: return self._detokenize_tensor(tokens) elif rank == 2: if sequence_length is None: raise ValueError( "sequence_length is required for Tensor detokenization" ) tokens = tf.RaggedTensor.from_tensor(tokens, lengths=sequence_length) return self._detokenize_tensor(tokens) else: raise ValueError("Unsupported tensor rank %d for detokenization" % rank) elif isinstance(tokens, list) and tokens and isinstance(tokens[0], list): return list(map(self.detokenize, tokens)) else: tokens = [tf.compat.as_text(token) for token in tokens] return self._detokenize_string(tokens) def _tokenize_tensor(self, text, training): """Tokenizes a tensor. When not overriden, this default implementation calls the string-based tokenization. Args: text: A 0-D or 1-D string ``tf.Tensor``. training: Set to ``False`` to tokenize for inference. Returns: A 1-D string ``tf.Tensor``, or a 2-D string ``tf.RaggedTensor`` if the input was a batch of text. """ def _python_wrapper(string_t): string = tf.compat.as_text(string_t.numpy()) tokens = self._tokenize_string(string, training) return tf.constant(tokens, dtype=tf.string) def _python_wrapper_batch(batch_text): batch_text = list(map(tf.compat.as_text, batch_text.numpy())) batch_tokens = self._tokenize_string_batch(batch_text, training) flat_tokens = tf.constant(tf.nest.flatten(batch_tokens), dtype=tf.string) lengths = tf.constant(list(map(len, batch_tokens)), dtype=tf.int32) return flat_tokens, lengths rank = text.shape.rank if rank == 0: tokens = tf.py_function(_python_wrapper, [text], tf.string) tokens.set_shape([None]) return tokens elif rank == 1: flat_tokens, lengths = tf.py_function( _python_wrapper_batch, [text], (tf.string, tf.int32) ) flat_tokens.set_shape([None]) lengths.set_shape([None]) return tf.RaggedTensor.from_row_lengths(flat_tokens, lengths) else: raise ValueError("Unsupported tensor rank %d for tokenization" % rank) def _detokenize_tensor(self, tokens): """Detokenizes tokens. When not overriden, this default implementation calls the string-based detokenization. Args: tokens: A 1-D string ``tf.Tensor``, or a 2-D string ``tf.RaggedTensor``. Returns: A 0-D string ``tf.Tensor``, or a 1-D string ``tf.Tensor`` if the input was a batch of tokens. """ def _python_wrapper(tokens_t): tokens = [tf.compat.as_text(s) for s in tokens_t.numpy()] string = self._detokenize_string(tokens) return tf.constant(string) rank = tokens.shape.rank if rank == 1: text = tf.py_function(_python_wrapper, [tokens], tf.string) text.set_shape([]) return text elif rank == 2: return tf.map_fn(self._detokenize_tensor, tokens, dtype=tf.string) else: raise ValueError("Unsupported tensor rank %d for detokenization" % rank) @abc.abstractmethod def _tokenize_string(self, text, training): """Tokenizes a Python unicode string. This method should be thread-safe. Args: text: A Python unicode string. training: Set to ``False`` to tokenize for inference. Returns: A list of Python unicode strings. """ raise NotImplementedError() def _tokenize_string_batch(self, batch_text, training): """Tokenizes a batch of Python unicode strings. Args: batch_text: A list of Python unicode strings. training: Set to ``False`` to tokenize for inference. Returns: A list of lists of Python unicode strings. """ return [self._tokenize_string(text, training) for text in batch_text] @abc.abstractmethod def _detokenize_string(self, tokens): """Detokenizes tokens. Args: tokens: A list of Python unicode strings. Returns: A unicode Python string. """ raise NotImplementedError()
_TOKENIZERS_REGISTRY = misc.ClassRegistry(base_class=Tokenizer) register_tokenizer = _TOKENIZERS_REGISTRY.register
[docs]def make_tokenizer(config=None): """Creates a tokenizer instance from the configuration. Args: config: Tokenization configuration as a Python dictionary, or a path to a YAML configuration file, or a JSON string. Returns: A :class:`opennmt.tokenizers.Tokenizer` instance. Raises: ValueError: if :obj:`config` is invalid. """ if config: if isinstance(config, str): if with as config_file: config = yaml.safe_load(config_file) else: try: config = json.loads(config) except json.JSONDecodeError: pass if isinstance(config, dict): tokenizer_type = config.get("type") if tokenizer_type is None: tokenizer_type = "OpenNMTTokenizer" tokenizer_params = config else: tokenizer_params = config.get("params", {}) tokenizer_class = _TOKENIZERS_REGISTRY.get(tokenizer_type) if tokenizer_class is None: raise ValueError( "%s is not in list of accepted tokenizers: %s" % ( tokenizer_type, ", ".join(sorted(_TOKENIZERS_REGISTRY.class_names)), ) ) tokenizer = tokenizer_class(**tokenizer_params) else: raise ValueError("Invalid tokenization configuration: %s" % str(config)) else: # If the tokenization was not configured, we assume that an external tokenization # was used and we don't include the tokenizer in the exported graph. tokenizer = SpaceTokenizer(in_graph=False) return tokenizer
def _process_stream_as_dataset( input_stream, output_stream, map_func, batch_size=512, num_parallel_calls=4, ): dataset = lambda: input_stream, output_types=tf.string, output_shapes=tf.TensorShape([]), ) dataset = dataset.batch(batch_size) dataset =, num_parallel_calls=num_parallel_calls) expected_spec = tf.TensorSpec(shape=[None], dtype=tf.string) if dataset.element_spec != expected_spec: raise TypeError( "Expected map_func to produce elements with spec %s, but got spec %s instead" % (expected_spec, dataset.element_spec) ) for lines in dataset.as_numpy_iterator(): for line in lines: misc.print_as_bytes(line, stream=output_stream) class TensorFlowTokenizer(tf.Module, Tokenizer): """Base class for tokenizers using only TensorFlow ops.""" @property def in_graph(self): return True def tokenize_stream( self, input_stream=sys.stdin, output_stream=sys.stdout, delimiter=" ", training=True, ): def _map_func(line): line = tf.strings.strip(line) tokens = self._tokenize_tensor(line, training) return tf.strings.reduce_join( tokens, axis=tokens.shape.rank - 1, separator=delimiter ) _process_stream_as_dataset(input_stream, output_stream, _map_func) def detokenize_stream( self, input_stream=sys.stdin, output_stream=sys.stdout, delimiter=" ", ): def _map_func(line): line = tf.strings.strip(line) tokens = tf.strings.split(line, sep=delimiter) return self._detokenize_tensor(tokens) _process_stream_as_dataset(input_stream, output_stream, _map_func) @abc.abstractmethod def _tokenize_tensor(self, text, training): raise NotImplementedError() @abc.abstractmethod def _detokenize_tensor(self, tokens): raise NotImplementedError() def _tokenize_string(self, text, training): tokens = self._tokenize_tensor(tf.constant(text, dtype=tf.string), training) return [token.decode("utf-8") for token in tokens.numpy()] def _detokenize_string(self, tokens): text = self._detokenize_tensor(tf.constant(tokens, dtype=tf.string)) return text.numpy().decode("utf-8")
[docs]@register_tokenizer class SpaceTokenizer(Tokenizer): """A tokenizer that splits on spaces."""
[docs] def __init__(self, in_graph=True): """Initializes the tokenizer. Args: in_graph: If ``True``, this tokenizer should be integrated in the exported graph. """ self._in_graph = in_graph
@property def in_graph(self): return self._in_graph def _tokenize_tensor(self, text, _): return tf.strings.split(text) def _detokenize_tensor(self, tokens): return tf.strings.reduce_join(tokens, axis=tokens.shape.rank - 1, separator=" ") def _tokenize_string(self, text, _): return text.split() def _detokenize_string(self, tokens): return " ".join(tokens)
[docs]@register_tokenizer class CharacterTokenizer(Tokenizer): """A tokenizer that splits unicode characters.""" @property def in_graph(self): return True def _tokenize_tensor(self, text, _): text = tf.strings.regex_replace(text, " ", "▁") return tf.strings.unicode_split(text, "UTF-8") def _detokenize_tensor(self, tokens): text = tf.strings.reduce_join(tokens, axis=tokens.shape.rank - 1) return tf.strings.regex_replace(text, "▁", " ") def _tokenize_string(self, text, _): return list(text.replace(" ", "▁")) def _detokenize_string(self, tokens): return "".join(tokens).replace("▁", " ")