Source code for src.corpora.auto

"""
auto.py

Default Dataset/Corpus Utilities. Downloads (if necessary) from the Hugging Face `datasets` Hub, and organizes into
de-facto training, validation, and testing tests. Performs additional tokenization and normalization as well.
"""
import logging
import os
from copy import deepcopy
from pathlib import Path
from typing import Dict, Iterable, List, Optional

import datasets
from transformers import BatchEncoding, PreTrainedTokenizer

from src.corpora.detokenization import DATASET_TOKENIZATION_REGISTRY

from .indexer import IndexedDataset

# Nest Overwatch under root `mistral` logger, inheriting formatting!
from .tokenization_utils import batch_tokenize


overwatch = logging.getLogger("mistral.corpora.auto")


[docs]def build_indexed_dataset( tokenizer: PreTrainedTokenizer, paths: Dict[str, Path], dataset_id: str, dataset_name: Optional[str], dataset_dir: Optional[str], seq_len: int, stride: Optional[int] = None, preprocessing_num_proc: int = 64, ignore_train: bool = False, shuffle_seed: int = 42, train_shuffle_buffer_size: Optional[int] = 10000, ) -> Dict[str, IndexedDataset]: """Builds Indexed Datasets from a Dataset Dictionary.""" dataset_key = dataset_id if dataset_name is not None: dataset_key = f"{dataset_name}-{dataset_id}" # First, Normalize Text if Necessary. Tokenization Strategies are in detokenization.py. if dataset_dir is not None: file_names = os.listdir(dataset_dir) file_type = os.path.splitext(file_names[0])[1][1:] dataset_files = {} dataset_files["train"] = [ f"{dataset_dir}/{fn}" for fn in file_names if "train" in fn and fn.endswith(file_type) ] dataset_files["validation"] = [ f"{dataset_dir}/{fn}" for fn in file_names if "validation" in fn and fn.endswith(file_type) ] file_type = "json" if file_type == "jsonl" else file_type assert file_type in ["json", "txt", "csv"] dataset = datasets.load_dataset( file_type, name=dataset_name, data_files=dataset_files, cache_dir=str(paths["dataset"]), ) else: dataset = datasets.load_dataset( dataset_id, name=dataset_name, cache_dir=str(paths["dataset"]), keep_in_memory=True ) if ignore_train and "train" in dataset: del dataset["train"] dataset = auto_detokenize(dataset_id, dataset, paths["preprocessed"], preprocessing_num_proc) # Create Post-Tokenization Cache Paths tokenization_cache = paths["preprocessed"] / dataset_key / "preprocessing" / "tokenization" tokenization_cache.mkdir(parents=True, exist_ok=True) post_tokenization_cache_files = {k: tokenization_cache / f"{k}-tokenized" for k in dataset} overwatch.info("Building Tokenized Indexed Dataset for {dataset_id}/{dataset_name}...") out_datasets = {} for k, ds in dataset.items(): overwatch.info(f"Building Indexed Dataset for {k}") token_iter = batch_tokenize(ds, tokenizer, batch_size=1000) out_datasets[k] = IndexedDataset.build_or_load(token_iter, post_tokenization_cache_files[k], seq_len, stride) # type: ignore if train_shuffle_buffer_size is not None and "train" in out_datasets: out_datasets["train"] = out_datasets["train"].seeded_shuffle( seed=shuffle_seed, buffer_size=train_shuffle_buffer_size ) return out_datasets
[docs]def get_auto_dataset( tokenizer: PreTrainedTokenizer, paths: Dict[str, Path], dataset_id: str = "wikitext", dataset_name: str = "wikitext-103-raw-v1", validation_ratio: float = 0.0005, seq_len: int = 1024, preprocessing_num_proc: int = 64, stride: int = -1, ignore_train: bool = False, ) -> datasets.DatasetDict: """Run basic tokenization and grouping to turn a Hugging Face Dataset (via `datasets`) into a torch.Dataset.""" # Sanity check on input args stride = seq_len if stride < 0 else stride assert stride <= seq_len, f"Data grouping stride ({stride}) is smaller than sequence length: we are losing data." dataset = datasets.load_dataset( dataset_id, name=dataset_name, cache_dir=str(paths["dataset"]), keep_in_memory=True ) if "validation" not in dataset: assert "train" in dataset, "You must have train in dataset to make a validation dataset" # Create Dataset Split Cache Files train_fn, val_fn = [str(paths["dataset"] / dataset_id / f"{k}-split.hf") for k in ["train", "val"]] dataset = dataset["train"].train_test_split( test_size=validation_ratio, train_indices_cache_file_name=train_fn, test_indices_cache_file_name=val_fn, ) dataset["validation"] = dataset["test"] del dataset["test"] # Preprocess Dataset in a Streaming Fashion assert "train" in dataset, "Field `train` not in Dataset!" if ignore_train: del dataset["train"] assert len(dataset) > 0, "You can't set ignore_train = True when there is only train data" # First, Normalize Text if Necessary. Tokenization Strategies are in detokenization.py. dataset = auto_detokenize(dataset_id, dataset, paths["preprocessed"], preprocessing_num_proc) # Second, run straight-up tokenization def tokenize(examples: Dict[str, List[str]]) -> BatchEncoding: return tokenizer(examples["text"]) overwatch.info(f"Tokenizing Dataset via Multiprocessing with `{preprocessing_num_proc}` threads...") # Create Post-Tokenization Cache Paths post_tokenization_cache_files = { k: str(paths["preprocessed"] / dataset_id / "preprocessing" / "tokenization" / f"{k}-tokenized.hf") for k in dataset } # Create Parent Path of Cache Files (paths["preprocessed"] / dataset_id / "preprocessing" / "tokenization").mkdir(parents=True, exist_ok=True) tokenized_dataset = dataset.map( tokenize, batched=True, # tokenization is parallelized by huggingface's fast tokenizers num_proc=1 if tokenizer.is_fast else preprocessing_num_proc, remove_columns=next(iter(dataset.values())).column_names, cache_file_names=post_tokenization_cache_files, load_from_cache_file=True, ) # Finally, actually run chunking (collapse multiple sequences into a giant document to read `seq_len` chunks from) def group(examples: Dict[str, Iterable[List[int]]]) -> Dict[str, List[List[int]]]: # Concatenate all the Texts concatenated: Dict[str, List[int]] = {k: sum(examples[k], []) for k in examples.keys()} total_length = len(concatenated[list(examples.keys())[0]]) # Drop the "very last" bit of the dataset that doesn't fit into block size... total_length = ((total_length - seq_len + stride) // stride) * stride # Split by Chunks of Maximum Length result = {k: [t[i : i + seq_len] for i in range(0, total_length, stride)] for k, t in concatenated.items()} result["labels"] = deepcopy(result["input_ids"]) # Mask out losses in overlapping regions. If training data, string will be equal to seq_len for i, labels in enumerate(result["labels"]): if i == 0: continue for j in range(len(labels) - stride): labels[j] = -100 result["labels"][i] = labels return result # From HF.Examples :: Note that with `batched=True`, this map processes 1,000 texts together, so group_texts throws # away a remainder for each of those groups of 1,000 texts. You can adjust that batch_size here but a higher # value might be slower to preprocess. # - Sidd Note (3/11): We're dropping a max of 8M / 9B tokens... we're probably fine! overwatch.info(f"Auto-Batching Dataset via Multiprocessing with `{preprocessing_num_proc}` threads...") # Create Post-Chunking Cache Paths post_chunking_cache_files = { k: str(paths["preprocessed"] / dataset_id / "preprocessing" / "chunking" / f"{k}-stride={stride}-chunked.hf") for k in dataset } # Create Parent Path of Cache Files (paths["preprocessed"] / dataset_id / "preprocessing" / "chunking").mkdir(parents=True, exist_ok=True) lm_dataset = tokenized_dataset.map( group, batched=True, num_proc=preprocessing_num_proc, cache_file_names=post_chunking_cache_files, load_from_cache_file=True, ) return lm_dataset
[docs]def auto_detokenize( dataset_id: str, dataset: datasets.DatasetDict, preprocess_path: Path, preprocessing_num_proc: int = 4 ) -> datasets.DatasetDict: if dataset_id in DATASET_TOKENIZATION_REGISTRY: overwatch.info(f"Detokenizing Dataset via Multiprocessing with `{preprocessing_num_proc}` threads...") # Create Post-Detokenization Cache Paths post_detokenization_cache_files = { k: str(preprocess_path / dataset_id / "preprocessing" / "detokenization" / f"{k}-detokenized.hf") for k in dataset } # Create Parent Path of Cache Files (preprocess_path / dataset_id / "preprocessing" / "detokenization").mkdir(parents=True, exist_ok=True) detokenized_dataset = dataset.map( DATASET_TOKENIZATION_REGISTRY[dataset_id], num_proc=preprocessing_num_proc, cache_file_names=post_detokenization_cache_files, load_from_cache_file=True, ) else: detokenized_dataset = dataset return detokenized_dataset
[docs]def get_lambada( tokenizer: PreTrainedTokenizer, paths: Dict[str, Path], dataset_id: str = "lambada", dataset_name: str = None, validation_ratio: float = 0.0005, seq_len: int = 1024, preprocessing_num_proc: int = 4, stride: int = -1, ignore_train: bool = False, ) -> datasets.DatasetDict: """ Run special tokenization and grouping for the Lambada dataset. Taken from https://github.com/NVIDIA/Megatron-LM/blob/main/tasks/zeroshot_gpt2/datasets.py """ overwatch.info(f"Preprocessing LAMBADA Dataset via Multiprocessing with `{preprocessing_num_proc}` threads...") # Sanity check on Input Arguments stride = seq_len if stride < 0 else stride assert stride <= seq_len, f"Data grouping stride ({stride}) is smaller than sequence length; we are losing data." dataset = datasets.load_dataset(dataset_id, dataset_name, cache_dir=str(paths["dataset"]), keep_in_memory=True) del dataset["train"] def tokenize_and_group(example: Dict[str, str]) -> Dict[str, List[int]]: text = example["text"] last_token = text.split()[-1] start_idx = text.rfind(last_token) beginning_tokens, last_token = tokenizer.encode(text[:start_idx].strip()), tokenizer.encode(" " + last_token) num_pad = seq_len - len(beginning_tokens) - len(last_token) assert num_pad >= 0, "LAMBADA example is longer than sequence length, will result in error." input_ids = beginning_tokens + last_token + [tokenizer.eos_token_id for _ in range(num_pad)] labels = [-100 for _ in beginning_tokens] + [tok for tok in last_token] + [-100 for _ in range(num_pad)] attention_mask = [1 for _ in range(len(beginning_tokens) + len(last_token))] + [0 for _ in range(num_pad)] return {"input_ids": input_ids, "labels": labels, "attention_mask": attention_mask} # Create Preprocessing Cache Paths post_preprocess_cache_files = { k: str(paths["preprocessed"] / "lambada" / "preprocessing" / f"{k}-processed.hf") for k in dataset } # Create Parent Path of Cache Files (paths["preprocessed"] / "lambada" / "preprocessing").mkdir(parents=True, exist_ok=True) processed_dataset = dataset.map( tokenize_and_group, batched=False, num_proc=preprocessing_num_proc, remove_columns=next(iter(dataset.values())).column_names, cache_file_names=post_preprocess_cache_files, load_from_cache_file=True, ) return processed_dataset
# Mapping of eval dataset name -> HF ids, names, and method for generating dataset ONLINE_EVAL_DATA_REGISTRY = { "wikitext": {"id": "wikitext", "name": "wikitext-103-raw-v1", "generator": get_auto_dataset}, "lambada": {"id": "lambada", "name": None, "generator": get_lambada}, }