Source code for onmt.inputters.text_corpus

"""Module that contain shard utils for dynamic data."""
import os
from onmt.utils.logging import logger
from onmt.constants import CorpusName, CorpusTask
from onmt.transforms import TransformPipe
from onmt.inputters.text_utils import process, parse_features, append_features_to_text
from contextlib import contextmanager
import itertools


@contextmanager
def exfile_open(filename, *args, **kwargs):
    """Extended file opener enables open(filename=None).

    This context manager enables open(filename=None) as well as regular file.
    filename None will produce endlessly None for each iterate,
    while filename with valid path will produce lines as usual.

    Args:
        filename (str|None): a valid file path or None;
        *args: args relate to open file using codecs;
        **kwargs: kwargs relate to open file using codecs.

    Yields:
        `None` repeatly if filename==None,
        else yield from file specified in `filename`.
    """
    if filename is None:
        from itertools import repeat

        _file = repeat(None)
    else:
        import codecs

        _file = codecs.open(filename, *args, **kwargs)
    yield _file
    if filename is not None and _file:
        _file.close()


class BlockwiseCorpus(object):
    """A corpus class for reading a single file block by block."""

    def __init__(self, name, file_path, block_size=4096):
        """Initialize file path and block size."""
        self.id = name
        self.file_path = file_path
        self.block_size = block_size

    def load(self, offset=0, stride=1):
        """
        Load file and iterate by blocks.
        `offset` and `stride` allow iterating only on every
        `stride` block, starting from `offset`.
        """

        def make_ex(block_content):
            example = {
                "src": block_content,
                "tgt": block_content,
                "src_original": block_content,
                "tgt_original": block_content,
            }
            return example

        with open(self.file_path, mode="r", encoding="utf-8") as file:
            block_content = ""
            block_index = 0

            while True:
                chunk = file.read(self.block_size)
                if not chunk:
                    break

                if (block_index // stride) % stride == offset:
                    block_content += chunk

                    if len(chunk) < self.block_size:
                        # Reached end of file
                        yield make_ex(block_content)
                        break

                    if len(block_content) >= self.block_size:
                        yield make_ex(block_content)
                block_content = ""
                block_index += 1

    def __str__(self):
        cls_name = type(self).__name__
        return (
            f"{cls_name}({self.id}, {self.file_path}, {self.file_path}"
            f"align={None}, "
            f"n_src_feats={0}, "
            f'src_feats_defaults="{None}")'
        )


[docs]class ParallelCorpus(object): """A parallel corpus file pair that can be loaded to iterate.""" def __init__( self, name, src, tgt, align=None, n_src_feats=0, src_feats_defaults=None ): """Initialize src & tgt side file path.""" self.id = name self.src = src self.tgt = tgt self.align = align self.n_src_feats = n_src_feats self.src_feats_defaults = src_feats_defaults
[docs] def load(self, offset=0, stride=1): """ Load file and iterate by lines. `offset` and `stride` allow to iterate only on every `stride` example, starting from `offset`. """ def make_ex(sline, tline, align): sline, sfeats = parse_features( sline, n_feats=self.n_src_feats, defaults=self.src_feats_defaults, ) # 'src_original' and 'tgt_original' store the # original line before tokenization. These # fields are used later on in the feature # transforms. example = { "src": sline, "tgt": tline, "src_original": sline, "tgt_original": tline, } if align is not None: example["align"] = align if sfeats is not None: example["src_feats"] = [f for f in sfeats] return example if isinstance(self.src, list): fs = self.src ft = [] if self.tgt is None else self.tgt fa = [] if self.align is None else self.align for i, (sline, tline, align) in enumerate( itertools.zip_longest(fs, ft, fa) ): if (i // stride) % stride == offset: yield make_ex(sline, tline, align) else: with exfile_open(self.src, mode="rb") as fs, exfile_open( self.tgt, mode="rb" ) as ft, exfile_open(self.align, mode="rb") as fa: for i, (sline, tline, align) in enumerate(zip(fs, ft, fa)): if (i // stride) % stride == offset: if tline is not None: tline = tline.decode("utf-8") if align is not None: align = align.decode("utf-8") yield make_ex(sline.decode("utf-8"), tline, align)
def __str__(self): cls_name = type(self).__name__ return ( f"{cls_name}({self.id}, {self.src}, {self.tgt}, " f"align={self.align}, " f"n_src_feats={self.n_src_feats}, " f'src_feats_defaults="{self.src_feats_defaults}")' )
def get_corpora(opts, task=CorpusTask.TRAIN, src=None, tgt=None, align=None): corpora_dict = {} if task == CorpusTask.TRAIN: for corpus_id, corpus_dict in opts.data.items(): if corpus_id != CorpusName.VALID: if corpus_dict.get("path_txt", None) is None: corpora_dict[corpus_id] = ParallelCorpus( corpus_id, corpus_dict["path_src"], corpus_dict["path_tgt"], corpus_dict["path_align"], n_src_feats=opts.n_src_feats, src_feats_defaults=opts.src_feats_defaults, ) else: corpora_dict[corpus_id] = BlockwiseCorpus( corpus_id, corpus_dict["path_txt"], block_size=8192, # number of characters ) elif task == CorpusTask.VALID: if CorpusName.VALID in opts.data.keys(): corpora_dict[CorpusName.VALID] = ParallelCorpus( CorpusName.VALID, opts.data[CorpusName.VALID]["path_src"], opts.data[CorpusName.VALID]["path_tgt"] if tgt is None else None, opts.data[CorpusName.VALID]["path_align"], n_src_feats=opts.n_src_feats, src_feats_defaults=opts.src_feats_defaults, ) else: return None else: corpora_dict[CorpusName.INFER] = ParallelCorpus( CorpusName.INFER, src if src else opts.src, tgt if tgt else opts.tgt, align if align else None, n_src_feats=opts.n_src_feats, src_feats_defaults=opts.src_feats_defaults, ) return corpora_dict
[docs]class ParallelCorpusIterator(object): """An iterator dedicated to ParallelCorpus. Args: corpus (ParallelCorpus): corpus to iterate; transform (TransformPipe): transforms to be applied to corpus; skip_empty_level (str): security level when encouter empty line; stride (int): iterate corpus with this line stride; offset (int): iterate corpus with this line offset. """ def __init__( self, corpus, transform, skip_empty_level="warning", stride=1, offset=0 ): self.cid = corpus.id self.corpus = corpus self.transform = transform if skip_empty_level not in ["silent", "warning", "error"]: raise ValueError(f"Invalid argument skip_empty_level={skip_empty_level}") self.skip_empty_level = skip_empty_level self.stride = stride self.offset = offset def _process(self, stream): for i, example in enumerate(stream): example["src"] = example["src"].strip().split(" ") example["src_original"] = example["src_original"].strip().split(" ") if "src_feats" in example: example["src_feats"] = [ feat.strip().split(" ") for feat in example["src_feats"] ] line_number = i * self.stride + self.offset example["cid_line_number"] = line_number example["cid"] = self.cid if "align" in example: example["align"] = example["align"].strip().split(" ") if example["tgt"] is not None: example["tgt"] = example["tgt"].strip().split(" ") example["tgt_original"] = example["tgt_original"].strip().split(" ") if ( len(example["src"]) == 0 or len(example["tgt"]) == 0 or ("align" in example and example["align"] == 0) ): # empty example: skip empty_msg = f"Empty line in {self.cid}#{line_number}." if self.skip_empty_level == "error": raise IOError(empty_msg) elif self.skip_empty_level == "warning": logger.warning(empty_msg) if len(example["src"]) == 0 and len(example["tgt"]) == 0: yield (example, self.transform, self.cid) continue yield (example, self.transform, self.cid) report_msg = self.transform.stats() if report_msg != "": logger.info( "* Transform statistics for {}({:.2f}%):\n{}\n".format( self.cid, 100 / self.stride, report_msg ) ) def __iter__(self): corpus_stream = self.corpus.load(stride=self.stride, offset=self.offset) corpus = self._process(corpus_stream) yield from corpus
def build_corpora_iters( corpora, transforms, corpora_info, skip_empty_level="warning", stride=1, offset=0 ): """Return `ParallelCorpusIterator` for all corpora defined in opts.""" corpora_iters = dict() for c_id, corpus in corpora.items(): transform_names = corpora_info[c_id].get("transforms", []) corpus_transform = [ transforms[name] for name in transform_names if name in transforms ] transform_pipe = TransformPipe.build_from(corpus_transform) corpus_iter = ParallelCorpusIterator( corpus, transform_pipe, skip_empty_level=skip_empty_level, stride=stride, offset=offset, ) corpora_iters[c_id] = corpus_iter return corpora_iters def save_transformed_sample(opts, transforms, n_sample=3): """Save transformed data sample as specified in opts.""" if n_sample == -1: logger.info(f"n_sample={n_sample}: Save full transformed corpus.") elif n_sample == 0: logger.info(f"n_sample={n_sample}: no sample will be saved.") return elif n_sample > 0: logger.info(f"Save {n_sample} transformed example/corpus.") else: raise ValueError(f"n_sample should >= -1, get {n_sample}.") corpora = get_corpora(opts, CorpusTask.TRAIN) datasets_iterables = build_corpora_iters( corpora, transforms, opts.data, skip_empty_level=opts.skip_empty_level ) sample_path = os.path.join(os.path.dirname(opts.save_data), CorpusName.SAMPLE) os.makedirs(sample_path, exist_ok=True) for c_name, c_iter in datasets_iterables.items(): dest_base = os.path.join(sample_path, "{}.{}".format(c_name, CorpusName.SAMPLE)) with open(dest_base + ".src", "w", encoding="utf-8") as f_src, open( dest_base + ".tgt", "w", encoding="utf-8" ) as f_tgt: bucket = [] for i, ex in enumerate(c_iter): if i > n_sample: break else: bucket.append(ex) pro_bucket = process(CorpusTask.TRAIN, bucket) if pro_bucket is not None: for maybe_example in pro_bucket: if maybe_example is not None: src_line, tgt_line = ( maybe_example["src"]["src"], maybe_example["tgt"]["tgt"], ) if "feats" in maybe_example["src"]: src_feats_lines = maybe_example["src"]["feats"] else: src_feats_lines = [] src_pretty_line = append_features_to_text( src_line, src_feats_lines ) f_src.write(src_pretty_line + "\n") f_tgt.write(tgt_line + "\n")