r"""
.. todo::
doc
"""
__all__ = [
"ElmoEmbedding"
]
import codecs
import json
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from .contextual_embedding import ContextualEmbedding
from ..core import logger
from ..core.vocabulary import Vocabulary
from ..io.file_utils import cached_path, _get_embedding_url, PRETRAINED_ELMO_MODEL_DIR
from ..modules.encoder._elmo import ElmobiLm, ConvTokenEmbedder
[文档]class ElmoEmbedding(ContextualEmbedding):
r"""
使用ELMo的embedding。初始化之后,只需要传入words就可以得到对应的embedding。
当前支持的使用名称初始化的模型:
.. code::
en: 即en-medium hidden_size 1024; output_size 12
en-medium: hidden_size 2048; output_size 256
en-origial: hidden_size 4096; output_size 512
en-original-5.5b: hidden_size 4096; output_size 512
en-small: hidden_size 1024; output_size 128
Example::
>>> import torch
>>> from fastNLP import Vocabulary
>>> from fastNLP.embeddings import ElmoEmbedding
>>> vocab = Vocabulary().add_word_lst("The whether is good .".split())
>>> # 使用不同层的concat的结果
>>> embed = ElmoEmbedding(vocab, model_dir_or_name='en', layers='1,2', requires_grad=False)
>>> words = torch.LongTensor([[vocab.to_index(word) for word in "The whether is good .".split()]])
>>> outputs = embed(words)
>>> outputs.size()
>>> # torch.Size([1, 5, 2048])
>>> # 使用不同层的weighted sum。
>>> embed = ElmoEmbedding(vocab, model_dir_or_name='en', layers='mix', requires_grad=False)
>>> embed.set_mix_weights_requires_grad() # 使得weighted的权重是可以学习的,但ELMO的LSTM部分是不更新
"""
[文档] def __init__(self, vocab: Vocabulary, model_dir_or_name: str = 'en', layers: str = '2', requires_grad: bool = True,
word_dropout=0.0, dropout=0.0, cache_word_reprs: bool = False):
r"""
:param vocab: 词表
:param model_dir_or_name: 可以有两种方式调用预训练好的ELMo embedding:第一种是传入ELMo所在文件夹,该文件夹下面应该有两个文件,
其中一个是以json为后缀的配置文件,另一个是以pkl为后缀的权重文件;第二种是传入ELMo版本的名称,将自动查看缓存中是否存在该模型,
没有的话将自动下载并缓存。
:param layers: str, 指定返回的层数(从0开始), 以,隔开不同的层。如果要返回第二层的结果'2', 返回后两层的结果'1,2'。不同的层的结果
按照这个顺序concat起来,默认为'2'。'mix'会使用可学习的权重结合不同层的表示(权重是否可训练与requires_grad保持一致,
初始化权重对三层结果进行mean-pooling, 可以通过ElmoEmbedding.set_mix_weights_requires_grad()方法只将mix weights设置为可学习。)
:param requires_grad: bool, 该层是否需要gradient, 默认为False.
:param float word_dropout: 以多大的概率将一个词替换为unk。这样既可以训练unk也是一定的regularize。
:param float dropout: 以多大的概率对embedding的表示进行Dropout。0.1即随机将10%的值置为0。
:param cache_word_reprs: 可以选择对word的表示进行cache; 设置为True的话,将在初始化的时候为每个word生成对应的embedding,
并删除character encoder,之后将直接使用cache的embedding。默认为False。
"""
super(ElmoEmbedding, self).__init__(vocab, word_dropout=word_dropout, dropout=dropout)
# 根据model_dir_or_name检查是否存在并下载
if model_dir_or_name.lower() in PRETRAINED_ELMO_MODEL_DIR:
model_url = _get_embedding_url('elmo', model_dir_or_name.lower())
model_dir = cached_path(model_url, name='embedding')
# 检查是否存在
elif os.path.isdir(os.path.abspath(os.path.expanduser(model_dir_or_name))):
model_dir = model_dir_or_name
else:
raise ValueError(f"Cannot recognize {model_dir_or_name}.")
self.model = _ElmoModel(model_dir, vocab, cache_word_reprs=cache_word_reprs)
num_layers = self.model.encoder.num_layers
if layers == 'mix':
self.layer_weights = nn.Parameter(torch.zeros(self.model.config['lstm']['n_layers'] + 1),
requires_grad=requires_grad)
self.gamma = nn.Parameter(torch.ones(1), requires_grad=requires_grad)
self._get_outputs = self._get_mixed_outputs
self._embed_size = self.model.config['lstm']['projection_dim'] * 2
else:
layers = list(map(int, layers.split(',')))
assert len(layers) > 0, "Must choose at least one output, but got None."
for layer in layers:
assert 0 <= layer <= num_layers, f"Layer index should be in range [0, {num_layers}], but got {layer}."
self.layers = layers
self._get_outputs = self._get_layer_outputs
self._embed_size = len(self.layers) * self.model.config['lstm']['projection_dim'] * 2
self.requires_grad = requires_grad
def _get_mixed_outputs(self, outputs):
# outputs: num_layers x batch_size x max_len x hidden_size
# return: batch_size x max_len x hidden_size
weights = F.softmax(self.layer_weights + 1 / len(outputs), dim=0).to(outputs)
outputs = torch.einsum('l,lbij->bij', weights, outputs)
return self.gamma.to(outputs) * outputs
[文档] def set_mix_weights_requires_grad(self, flag=True):
r"""
当初始化ElmoEmbedding时layers被设置为mix时,可以通过调用该方法设置mix weights是否可训练。如果layers不是mix,调用
该方法没有用。
:param bool flag: 混合不同层表示的结果是否可以训练。
:return:
"""
if hasattr(self, 'layer_weights'):
self.layer_weights.requires_grad = flag
self.gamma.requires_grad = flag
def _get_layer_outputs(self, outputs):
if len(self.layers) == 1:
outputs = outputs[self.layers[0]]
else:
outputs = torch.cat(tuple([*outputs[self.layers]]), dim=-1)
return outputs
[文档] def forward(self, words: torch.LongTensor):
r"""
计算words的elmo embedding表示。根据elmo文章中介绍的ELMO实际上是有2L+1层结果,但是为了让结果比较容易拆分,token的
被重复了一次,使得实际上layer=0的结果是[token_embedding;token_embedding], 而layer=1的结果是[forward_hiddens;
backward_hiddens].
:param words: batch_size x max_len
:return: torch.FloatTensor. batch_size x max_len x (512*len(self.layers))
"""
words = self.drop_word(words)
outputs = self._get_sent_reprs(words)
if outputs is not None:
return self.dropout(outputs)
outputs = self.model(words)
outputs = self._get_outputs(outputs)
return self.dropout(outputs)
def _delete_model_weights(self):
for name in ['layers', 'model', 'layer_weights', 'gamma']:
if hasattr(self, name):
delattr(self, name)
class _ElmoModel(nn.Module):
r"""
该Module是ElmoEmbedding中进行所有的heavy lifting的地方。做的工作,包括
(1) 根据配置,加载模型;
(2) 根据vocab,对模型中的embedding进行调整. 并将其正确初始化
(3) 保存一个words与chars的对应转换,获取时自动进行相应的转换
(4) 设计一个保存token的embedding,允许缓存word的表示。
"""
def __init__(self, model_dir: str, vocab: Vocabulary = None, cache_word_reprs: bool = False):
super(_ElmoModel, self).__init__()
self.model_dir = model_dir
dir = os.walk(self.model_dir)
config_file = None
weight_file = None
config_count = 0
weight_count = 0
for path, dir_list, file_list in dir:
for file_name in file_list:
if file_name.__contains__(".json"):
config_file = file_name
config_count += 1
elif file_name.__contains__(".pkl"):
weight_file = file_name
weight_count += 1
if config_count > 1 or weight_count > 1:
raise Exception(f"Multiple config files(*.json) or weight files(*.hdf5) detected in {model_dir}.")
elif config_count == 0 or weight_count == 0:
raise Exception(f"No config file or weight file found in {model_dir}")
with open(os.path.join(model_dir, config_file), 'r') as config_f:
config = json.load(config_f)
self.weight_file = os.path.join(model_dir, weight_file)
self.config = config
OOV_TAG = '<oov>'
PAD_TAG = '<pad>'
BOS_TAG = '<bos>'
EOS_TAG = '<eos>'
BOW_TAG = '<bow>'
EOW_TAG = '<eow>'
# For the model trained with character-based word encoder.
char_lexicon = {}
with codecs.open(os.path.join(model_dir, 'char.dic'), 'r', encoding='utf-8') as fpi:
for line in fpi:
tokens = line.strip().split('\t')
if len(tokens) == 1:
tokens.insert(0, '\u3000')
token, i = tokens
char_lexicon[token] = int(i)
# 做一些sanity check
for special_word in [PAD_TAG, OOV_TAG, BOW_TAG, EOW_TAG]:
assert special_word in char_lexicon, f"{special_word} not found in char.dic."
# 从vocab中构建char_vocab
char_vocab = Vocabulary(unknown=OOV_TAG, padding=PAD_TAG)
# 需要保证<bow>与<eow>在里面
char_vocab.add_word_lst([BOW_TAG, EOW_TAG, BOS_TAG, EOS_TAG])
for word, index in vocab:
char_vocab.add_word_lst(list(word))
self.bos_index, self.eos_index, self._pad_index = len(vocab), len(vocab) + 1, vocab.padding_idx
# 根据char_lexicon调整, 多设置一位,是预留给word padding的(该位置的char表示为全0表示)
char_emb_layer = nn.Embedding(len(char_vocab) + 1, int(config['char_cnn']['embedding']['dim']),
padding_idx=len(char_vocab))
# 读入预训练权重 这里的elmo_model 包含char_cnn和 lstm 的 state_dict
elmo_model = torch.load(os.path.join(self.model_dir, weight_file), map_location='cpu')
char_embed_weights = elmo_model["char_cnn"]['char_emb_layer.weight']
found_char_count = 0
for char, index in char_vocab: # 调整character embedding
if char in char_lexicon:
index_in_pre = char_lexicon.get(char)
found_char_count += 1
else:
index_in_pre = char_lexicon[OOV_TAG]
char_emb_layer.weight.data[index] = char_embed_weights[index_in_pre]
logger.info(f"{found_char_count} out of {len(char_vocab)} characters were found in pretrained elmo embedding.")
# 生成words到chars的映射
max_chars = config['char_cnn']['max_characters_per_token']
self.register_buffer('words_to_chars_embedding', torch.full((len(vocab) + 2, max_chars),
fill_value=len(char_vocab),
dtype=torch.long))
for word, index in list(iter(vocab)) + [(BOS_TAG, len(vocab)), (EOS_TAG, len(vocab) + 1)]:
if len(word) + 2 > max_chars:
word = word[:max_chars - 2]
if index == self._pad_index:
continue
elif word == BOS_TAG or word == EOS_TAG:
char_ids = [char_vocab.to_index(BOW_TAG)] + [char_vocab.to_index(word)] + [
char_vocab.to_index(EOW_TAG)]
char_ids += [char_vocab.to_index(PAD_TAG)] * (max_chars - len(char_ids))
else:
char_ids = [char_vocab.to_index(BOW_TAG)] + [char_vocab.to_index(c) for c in word] + [
char_vocab.to_index(EOW_TAG)]
char_ids += [char_vocab.to_index(PAD_TAG)] * (max_chars - len(char_ids))
self.words_to_chars_embedding[index] = torch.LongTensor(char_ids)
self.char_vocab = char_vocab
self.token_embedder = ConvTokenEmbedder(
config, self.weight_file, None, char_emb_layer)
elmo_model["char_cnn"]['char_emb_layer.weight'] = char_emb_layer.weight
self.token_embedder.load_state_dict(elmo_model["char_cnn"])
self.output_dim = config['lstm']['projection_dim']
# lstm encoder
self.encoder = ElmobiLm(config)
self.encoder.load_state_dict(elmo_model["lstm"])
if cache_word_reprs:
if config['char_cnn']['embedding']['dim'] > 0: # 只有在使用了chars的情况下有用
logger.info("Start to generate cache word representations.")
batch_size = 320
# bos eos
word_size = self.words_to_chars_embedding.size(0)
num_batches = word_size // batch_size + \
int(word_size % batch_size != 0)
self.cached_word_embedding = nn.Embedding(word_size,
config['lstm']['projection_dim'])
with torch.no_grad():
for i in range(num_batches):
words = torch.arange(i * batch_size,
min((i + 1) * batch_size, word_size)).long()
chars = self.words_to_chars_embedding[words].unsqueeze(1) # batch_size x 1 x max_chars
word_reprs = self.token_embedder(words.unsqueeze(1),
chars).detach() # batch_size x 1 x config['encoder']['projection_dim']
self.cached_word_embedding.weight.data[words] = word_reprs.squeeze(1)
logger.info("Finish generating cached word representations. Going to delete the character encoder.")
del self.token_embedder, self.words_to_chars_embedding
else:
logger.info("There is no need to cache word representations, since no character information is used.")
def forward(self, words):
r"""
:param words: batch_size x max_len
:return: num_layers x batch_size x max_len x hidden_size
"""
# 扩展<bos>, <eos>
batch_size, max_len = words.size()
expanded_words = words.new_zeros(batch_size, max_len + 2) # 因为pad一定为0,
seq_len = words.ne(self._pad_index).sum(dim=-1)
expanded_words[:, 1:-1] = words
expanded_words[:, 0].fill_(self.bos_index)
expanded_words[torch.arange(batch_size).to(words), seq_len + 1] = self.eos_index
seq_len = seq_len + 2
zero_tensor = expanded_words.new_zeros(expanded_words.shape)
mask = (expanded_words == zero_tensor).unsqueeze(-1)
if hasattr(self, 'cached_word_embedding'):
token_embedding = self.cached_word_embedding(expanded_words)
else:
if hasattr(self, 'words_to_chars_embedding'):
chars = self.words_to_chars_embedding[expanded_words]
else:
chars = None
token_embedding = self.token_embedder(expanded_words, chars) # batch_size x max_len x embed_dim
encoder_output = self.encoder(token_embedding, seq_len)
if encoder_output.size(2) < max_len + 2:
num_layers, _, output_len, hidden_size = encoder_output.size()
dummy_tensor = encoder_output.new_zeros(num_layers, batch_size,
max_len + 2 - output_len, hidden_size)
encoder_output = torch.cat((encoder_output, dummy_tensor), 2)
sz = encoder_output.size() # 2, batch_size, max_len, hidden_size
token_embedding = token_embedding.masked_fill(mask, 0)
token_embedding = torch.cat((token_embedding, token_embedding), dim=2).view(1, sz[1], sz[2], sz[3])
encoder_output = torch.cat((token_embedding, encoder_output), dim=0)
# 删除<eos>, <bos>. 这里没有精确地删除,但应该也不会影响最后的结果了。
encoder_output = encoder_output[:, :, 1:-1]
return encoder_output