fastNLP.core.vocabulary 源代码

r"""
.. todo::
    doc
"""

__all__ = [
    "Vocabulary",
    "VocabularyOption",
]

from collections import Counter
from functools import partial
from functools import wraps

from ._logger import logger
from .dataset import DataSet
from .utils import Option
from .utils import _is_iterable
import io


class VocabularyOption(Option):
    def __init__(self,
                 max_size=None,
                 min_freq=None,
                 padding='<pad>',
                 unknown='<unk>'):
        super().__init__(
            max_size=max_size,
            min_freq=min_freq,
            padding=padding,
            unknown=unknown
        )


def _check_build_vocab(func):
    r"""A decorator to make sure the indexing is built before used.

    """
    
    @wraps(func)  # to solve missing docstring
    def _wrapper(self, *args, **kwargs):
        if self._word2idx is None or self.rebuild is True:
            self.build_vocab()
        return func(self, *args, **kwargs)
    
    return _wrapper


def _check_build_status(func):
    r"""A decorator to check whether the vocabulary updates after the last build.

    """
    
    @wraps(func)  # to solve missing docstring
    def _wrapper(self, *args, **kwargs):
        if self.rebuild is False:
            self.rebuild = True
            if self.max_size is not None and len(self.word_count) >= self.max_size:
                logger.info("[Warning] Vocabulary has reached the max size {} when calling {} method. "
                            "Adding more words may cause unexpected behaviour of Vocabulary. ".format(
                    self.max_size, func.__name__))
        return func(self, *args, **kwargs)
    
    return _wrapper


[文档]class Vocabulary(object): r""" 用于构建, 存储和使用 `str` 到 `int` 的一一映射:: vocab = Vocabulary() word_list = "this is a word list".split() vocab.update(word_list) vocab["word"] # str to int vocab.to_word(5) # int to str """
[文档] def __init__(self, max_size=None, min_freq=None, padding='<pad>', unknown='<unk>'): r""" :param int max_size: `Vocabulary` 的最大大小, 即能存储词的最大数量 若为 ``None`` , 则不限制大小. Default: ``None`` :param int min_freq: 能被记录下的词在文本中的最小出现频率, 应大于或等于 1. 若小于该频率, 词语将被视为 `unknown`. 若为 ``None`` , 所有文本中的词都被记录. Default: ``None`` :param str optional padding: padding的字符. 如果设置为 ``None`` , 则vocabulary中不考虑padding, 也不计入词表大小,为 ``None`` 的情况多在为label建立Vocabulary的情况. Default: '<pad>' :param str optional unknown: unknown的字符,所有未被记录的词在转为 `int` 时将被视为unknown. 如果设置为 ``None`` ,则vocabulary中不考虑unknow, 也不计入词表大小. 为 ``None`` 的情况多在为label建立Vocabulary的情况. Default: '<unk>' """ self.max_size = max_size self.min_freq = min_freq self.word_count = Counter() self.unknown = unknown self.padding = padding self._word2idx = None self._idx2word = None self.rebuild = True # 用于承载不需要单独创建entry的词语,具体见from_dataset()方法 self._no_create_word = Counter()
@property @_check_build_vocab def word2idx(self): return self._word2idx @word2idx.setter def word2idx(self, value): self._word2idx = value @property @_check_build_vocab def idx2word(self): return self._idx2word @idx2word.setter def idx2word(self, value): self._word2idx = value
[文档] @_check_build_status def update(self, word_lst, no_create_entry=False): r"""依次增加序列中词在词典中的出现频率 :param list word_lst: a list of strings :param bool no_create_entry: 如果词语来自于非训练集建议设置为True。在使用fastNLP.TokenEmbedding加载预训练模型时,没有从预训练词表中找到这个词的处理方式。 如果为True,则不会有这个词语创建一个单独的entry,它将一直被指向unk的表示; 如果为False,则为这个词创建一个单独 的entry。如果这个word来自于dev或者test,一般设置为True,如果来自与train一般设置为False。以下两种情况: 如果新 加入一个word,且no_create_entry为True,但这个词之前已经在Vocabulary中且并不是no_create_entry的,则还是会为这 个词创建一个单独的vector; 如果no_create_entry为False,但这个词之前已经在Vocabulary中且并不是no_create_entry的, 则这个词将认为是需要创建单独的vector的。 """ self._add_no_create_entry(word_lst, no_create_entry) self.word_count.update(word_lst) return self
[文档] @_check_build_status def add(self, word, no_create_entry=False): r""" 增加一个新词在词典中的出现频率 :param str word: 新词 :param bool no_create_entry: 如果词语来自于非训练集建议设置为True。在使用fastNLP.TokenEmbedding加载预训练模型时,没有从预训练词表中找到这个词的处理方式。 如果为True,则不会有这个词语创建一个单独的entry,它将一直被指向unk的表示; 如果为False,则为这个词创建一个单独 的entry。如果这个word来自于dev或者test,一般设置为True,如果来自与train一般设置为False。以下两种情况: 如果新 加入一个word,且no_create_entry为True,但这个词之前已经在Vocabulary中且并不是no_create_entry的,则还是会为这 个词创建一个单独的vector; 如果no_create_entry为False,但这个词之前已经在Vocabulary中且并不是no_create_entry的, 则这个词将认为是需要创建单独的vector的。 """ self._add_no_create_entry(word, no_create_entry) self.word_count[word] += 1 return self
def _add_no_create_entry(self, word, no_create_entry): r""" 在新加入word时,检查_no_create_word的设置。 :param str List[str] word: :param bool no_create_entry: :return: """ if isinstance(word, str) or not _is_iterable(word): word = [word] for w in word: if no_create_entry and self.word_count.get(w, 0) == self._no_create_word.get(w, 0): self._no_create_word[w] += 1 elif not no_create_entry and w in self._no_create_word: self._no_create_word.pop(w)
[文档] @_check_build_status def add_word(self, word, no_create_entry=False): r""" 增加一个新词在词典中的出现频率 :param str word: 新词 :param bool no_create_entry: 如果词语来自于非训练集建议设置为True。在使用fastNLP.TokenEmbedding加载预训练模型时,没有从预训练词表中找到这个词的处理方式。 如果为True,则不会有这个词语创建一个单独的entry,它将一直被指向unk的表示; 如果为False,则为这个词创建一个单独 的entry。如果这个word来自于dev或者test,一般设置为True,如果来自与train一般设置为False。以下两种情况: 如果新 加入一个word,且no_create_entry为True,但这个词之前已经在Vocabulary中且并不是no_create_entry的,则还是会为这 个词创建一个单独的vector; 如果no_create_entry为False,但这个词之前已经在Vocabulary中且并不是no_create_entry的, 则这个词将认为是需要创建单独的vector的。 """ self.add(word, no_create_entry=no_create_entry)
[文档] @_check_build_status def add_word_lst(self, word_lst, no_create_entry=False): r""" 依次增加序列中词在词典中的出现频率 :param list[str] word_lst: 词的序列 :param bool no_create_entry: 如果词语来自于非训练集建议设置为True。在使用fastNLP.TokenEmbedding加载预训练模型时,没有从预训练词表中找到这个词的处理方式。 如果为True,则不会有这个词语创建一个单独的entry,它将一直被指向unk的表示; 如果为False,则为这个词创建一个单独 的entry。如果这个word来自于dev或者test,一般设置为True,如果来自与train一般设置为False。以下两种情况: 如果新 加入一个word,且no_create_entry为True,但这个词之前已经在Vocabulary中且并不是no_create_entry的,则还是会为这 个词创建一个单独的vector; 如果no_create_entry为False,但这个词之前已经在Vocabulary中且并不是no_create_entry的, 则这个词将认为是需要创建单独的vector的。 """ self.update(word_lst, no_create_entry=no_create_entry) return self
[文档] def build_vocab(self): r""" 根据已经出现的词和出现频率构建词典. 注意: 重复构建可能会改变词典的大小, 但已经记录在词典中的词, 不会改变对应的 `int` """ if self._word2idx is None: self._word2idx = {} if self.padding is not None: self._word2idx[self.padding] = len(self._word2idx) if (self.unknown is not None) and (self.unknown != self.padding): self._word2idx[self.unknown] = len(self._word2idx) max_size = min(self.max_size, len(self.word_count)) if self.max_size else None words = self.word_count.most_common(max_size) if self.min_freq is not None: words = filter(lambda kv: kv[1] >= self.min_freq, words) if self._word2idx is not None: words = filter(lambda kv: kv[0] not in self._word2idx, words) start_idx = len(self._word2idx) self._word2idx.update({w: i + start_idx for i, (w, _) in enumerate(words)}) self.build_reverse_vocab() self.rebuild = False return self
[文档] def build_reverse_vocab(self): r""" 基于 `word to index` dict, 构建 `index to word` dict. """ self._idx2word = {i: w for w, i in self._word2idx.items()} return self
@_check_build_vocab def __len__(self): return len(self._word2idx) @_check_build_vocab def __contains__(self, item): r""" 检查词是否被记录 :param item: the word :return: True or False """ return item in self._word2idx
[文档] def has_word(self, w): r""" 检查词是否被记录:: has_abc = vocab.has_word('abc') # equals to has_abc = 'abc' in vocab :param item: the word :return: ``True`` or ``False`` """ return self.__contains__(w)
@_check_build_vocab def __getitem__(self, w): r""" To support usage like:: vocab[w] """ if w in self._word2idx: return self._word2idx[w] if self.unknown is not None: return self._word2idx[self.unknown] else: raise ValueError("word `{}` not in vocabulary".format(w))
[文档] @_check_build_vocab def index_dataset(self, *datasets, field_name, new_field_name=None): r""" 将DataSet中对应field的词转为数字,Example:: # remember to use `field_name` vocab.index_dataset(train_data, dev_data, test_data, field_name='words') :param ~fastNLP.DataSet,List[~fastNLP.DataSet] datasets: 需要转index的一个或多个数据集 :param list,str field_name: 需要转index的field, 若有多个 DataSet, 每个DataSet都必须有此 field. 目前支持 ``str`` , ``List[str]`` :param list,str new_field_name: 保存结果的field_name. 若为 ``None`` , 将覆盖原field. Default: ``None``. """ def index_instance(field): r""" 有几种情况, str, 1d-list, 2d-list :param ins: :return: """ if isinstance(field, str) or not _is_iterable(field): return self.to_index(field) else: if isinstance(field[0], str) or not _is_iterable(field[0]): return [self.to_index(w) for w in field] else: if not isinstance(field[0][0], str) and _is_iterable(field[0][0]): raise RuntimeError("Only support field with 2 dimensions.") return [[self.to_index(c) for c in w] for w in field] new_field_name = new_field_name or field_name if type(new_field_name) == type(field_name): if isinstance(new_field_name, list): assert len(new_field_name) == len(field_name), "new_field_name should have same number elements with " \ "field_name." elif isinstance(new_field_name, str): field_name = [field_name] new_field_name = [new_field_name] else: raise TypeError("field_name and new_field_name can only be str or List[str].") for idx, dataset in enumerate(datasets): if isinstance(dataset, DataSet): try: for f_n, n_f_n in zip(field_name, new_field_name): dataset.apply_field(index_instance, field_name=f_n, new_field_name=n_f_n) except Exception as e: logger.info("When processing the `{}` dataset, the following error occurred.".format(idx)) raise e else: raise RuntimeError("Only DataSet type is allowed.") return self
@property def _no_create_word_length(self): return len(self._no_create_word)
[文档] def from_dataset(self, *datasets, field_name, no_create_entry_dataset=None): r""" 使用dataset的对应field中词构建词典:: # remember to use `field_name` vocab.from_dataset(train_data1, train_data2, field_name='words') :param ~fastNLP.DataSet,List[~fastNLP.DataSet] datasets: 需要转index的一个或多个数据集 :param str,List[str] field_name: 可为 ``str`` 或 ``List[str]`` . 构建词典所使用的 field(s), 支持一个或多个field,若有多个 DataSet, 每个DataSet都必须有这些field. 目前支持的field结构 : ``str`` , ``List[str]`` :param no_create_entry_dataset: 可以传入DataSet, List[DataSet]或者None(默认), 建议直接将非训练数据都传入到这个参数。该选项用在接下来的模型会使用pretrain 的embedding(包括glove, word2vec, elmo与bert)且会finetune的情况。如果仅使用来自于train的数据建立vocabulary,会导致test与dev 中的数据无法充分利用到来自于预训练embedding的信息,所以在建立词表的时候将test与dev考虑进来会使得最终的结果更好。 如果一个词出现在了train中,但是没在预训练模型中,embedding会为它用unk初始化,但它是单独的一个vector,如果 finetune embedding的话,这个词在更新之后可能会有更好的表示; 而如果这个词仅出现在了dev或test中,那么就不能为它们单独建立vector, 而应该让它指向unk这个vector的值。所以只位于no_create_entry_dataset中的token,将首先从预训练的词表中寻找它的表示, 如果找到了,就使用该表示; 如果没有找到,则认为该词的表示应该为unk的表示。 :return self: """ if isinstance(field_name, str): field_name = [field_name] elif not isinstance(field_name, list): raise TypeError('invalid argument field_name: {}'.format(field_name)) def construct_vocab(ins, no_create_entry=False): for fn in field_name: field = ins[fn] if isinstance(field, str) or not _is_iterable(field): self.add_word(field, no_create_entry=no_create_entry) else: if isinstance(field[0], str) or not _is_iterable(field[0]): for word in field: self.add_word(word, no_create_entry=no_create_entry) else: if not isinstance(field[0][0], str) and _is_iterable(field[0][0]): raise RuntimeError("Only support field with 2 dimensions.") for words in field: for word in words: self.add_word(word, no_create_entry=no_create_entry) for idx, dataset in enumerate(datasets): if isinstance(dataset, DataSet): try: dataset.apply(construct_vocab) except BaseException as e: logger.error("When processing the `{}` dataset, the following error occurred:".format(idx)) raise e else: raise TypeError("Only DataSet type is allowed.") if no_create_entry_dataset is not None: partial_construct_vocab = partial(construct_vocab, no_create_entry=True) if isinstance(no_create_entry_dataset, DataSet): no_create_entry_dataset.apply(partial_construct_vocab) elif isinstance(no_create_entry_dataset, list): for dataset in no_create_entry_dataset: if not isinstance(dataset, DataSet): raise TypeError("Only DataSet type is allowed.") dataset.apply(partial_construct_vocab) return self
def _is_word_no_create_entry(self, word): r""" 判断当前的word是否是不需要创建entry的,具体参见from_dataset的说明 :param word: str :return: bool """ return word in self._no_create_word
[文档] def to_index(self, w): r""" 将词转为数字. 若词不再词典中被记录, 将视为 unknown, 若 ``unknown=None`` , 将抛出 ``ValueError`` :: index = vocab.to_index('abc') # equals to index = vocab['abc'] :param str w: a word :return int index: the number """ return self.__getitem__(w)
@property @_check_build_vocab def unknown_idx(self): r""" unknown 对应的数字. """ if self.unknown is None: return None return self._word2idx[self.unknown] @property @_check_build_vocab def padding_idx(self): r""" padding 对应的数字 """ if self.padding is None: return None return self._word2idx[self.padding]
[文档] @_check_build_vocab def to_word(self, idx): r""" 给定一个数字, 将其转为对应的词. :param int idx: the index :return str word: the word """ return self._idx2word[idx]
[文档] def clear(self): r""" 删除Vocabulary中的词表数据。相当于重新初始化一下。 :return: """ self.word_count.clear() self._word2idx = None self._idx2word = None self.rebuild = True self._no_create_word.clear() return self
def __getstate__(self): r"""Use to prepare data for pickle. """ len(self) # make sure vocab has been built state = self.__dict__.copy() # no need to pickle _idx2word as it can be constructed from _word2idx del state['_idx2word'] return state def __setstate__(self, state): r"""Use to restore state from pickle. """ self.__dict__.update(state) self.build_reverse_vocab() def __repr__(self): return "Vocabulary({}...)".format(list(self.word_count.keys())[:5]) @_check_build_vocab def __iter__(self): # 依次(word1, 0), (word1, 1) for index in range(len(self._word2idx)): yield self.to_word(index), index
[文档] def save(self, filepath): r""" :param str,io.StringIO filepath: Vocabulary的储存路径 :return: """ if isinstance(filepath, io.IOBase): assert filepath.writable() f = filepath elif isinstance(filepath, str): try: f = open(filepath, 'w', encoding='utf-8') except Exception as e: raise e else: raise TypeError("Illegal `filepath`.") f.write(f'max_size\t{self.max_size}\n') f.write(f'min_freq\t{self.min_freq}\n') f.write(f'unknown\t{self.unknown}\n') f.write(f'padding\t{self.padding}\n') f.write(f'rebuild\t{self.rebuild}\n') f.write('\n') # idx: 如果idx为-2, 说明还没有进行build; 如果idx为-1,说明该词未编入 # no_create_entry: 如果为1,说明该词是no_create_entry; 0 otherwise # word \t count \t idx \t no_create_entry \n idx = -2 for word, count in self.word_count.items(): if self._word2idx is not None: idx = self._word2idx.get(word, -1) is_no_create_entry = int(self._is_word_no_create_entry(word)) f.write(f'{word}\t{count}\t{idx}\t{is_no_create_entry}\n') if isinstance(filepath, str): # 如果是file的话就关闭 f.close()
[文档] @staticmethod def load(filepath): r""" :param str,io.StringIO filepath: Vocabulary的读取路径 :return: Vocabulary """ if isinstance(filepath, io.IOBase): assert filepath.writable() f = filepath elif isinstance(filepath, str): try: f = open(filepath, 'r', encoding='utf-8') except Exception as e: raise e else: raise TypeError("Illegal `filepath`.") vocab = Vocabulary() for line in f: line = line.strip('\n') if line: name, value = line.split() if name in ('max_size', 'min_freq'): value = int(value) if value!='None' else None setattr(vocab, name, value) elif name in ('unknown', 'padding'): value = value if value!='None' else None setattr(vocab, name, value) elif name == 'rebuild': vocab.rebuild = True if value=='True' else False else: break word_counter = {} no_create_entry_counter = {} word2idx = {} for line in f: line = line.strip('\n') if line: parts = line.split('\t') word,count,idx,no_create_entry = parts[0], int(parts[1]), int(parts[2]), int(parts[3]) if idx >= 0: word2idx[word] = idx word_counter[word] = count if no_create_entry: no_create_entry_counter[word] = count word_counter = Counter(word_counter) no_create_entry_counter = Counter(no_create_entry_counter) if len(word2idx)>0: if vocab.padding: word2idx[vocab.padding] = 0 if vocab.unknown: word2idx[vocab.unknown] = 1 if vocab.padding else 0 idx2word = {value:key for key,value in word2idx.items()} vocab.word_count = word_counter vocab._no_create_word = no_create_entry_counter if word2idx: vocab._word2idx = word2idx vocab._idx2word = idx2word if isinstance(filepath, str): # 如果是file的话就关闭 f.close() return vocab