fastNLP.core.batch

batch 模块实现了 fastNLP 所需的 DataSetIter 类。

class fastNLP.core.batch.BatchIter(dataset, batch_size=1, sampler=None, num_workers=0, pin_memory=False, drop_last=False, timeout=0, worker_init_fn=None, collate_fn=None, batch_sampler=None)[源代码]

别名 fastNLP.BatchIter fastNLP.core.batch.BatchIter

Trainer用于迭代数据的类。继承该类,并实现get_num_batches(), get_batch_indices(), num_batches(), __iter__()方法以及dataset属性。

static get_num_batches(num_samples, batch_size, drop_last)[源代码]

计算batch的数量。用于前端显示进度

参数
  • num_samples (int) --

  • batch_size (int) --

  • drop_last (bool) -- 如果最后一个batch没有batch_size这么多,是否就丢掉。

返回

get_batch_indices()[源代码]

获取最近输出的batch的index。用于溯源当前batch的数据

返回

property dataset

获取正在参与iterate的dataset

返回

class fastNLP.core.batch.DataSetIter(dataset, batch_size=1, sampler=None, as_numpy=False, num_workers=0, pin_memory=False, drop_last=False, timeout=0, worker_init_fn=None, batch_sampler=None)[源代码]

基类 fastNLP.BatchIter

别名 fastNLP.DataSetIter fastNLP.core.batch.DataSetIter

DataSetIter 用于从 DataSet 中按一定的顺序, 依次按 batch_size 的大小将数据取出,通过使用DataSetIter,可以不需要考虑

输入的padding(由DataSet中每列的Padder决定了)以及不需要考虑将数据转为tensor。

组成 xy:

batch = DataSetIter(data_set, batch_size=16, sampler=SequentialSampler())
num_batch = len(batch)
for batch_x, batch_y in batch:
    # do stuff ...
__init__(dataset, batch_size=1, sampler=None, as_numpy=False, num_workers=0, pin_memory=False, drop_last=False, timeout=0, worker_init_fn=None, batch_sampler=None)[源代码]
参数
  • dataset -- DataSet 对象, 数据集

  • batch_size (int) -- 取出的batch大小

  • sampler --

    规定使用的 Sampler 方式. 若为 None , 使用 SequentialSampler.

    Default: None

  • as_numpy (bool) --

    若为 True , 输出batch为 numpy.array. 否则为 torch.Tensor.

    Default: False

  • num_workers (int) -- 使用多少个进程来预处理数据

  • pin_memory (bool) -- 是否将产生的tensor使用pin memory, 可能会加快速度。

  • drop_last (bool) -- 如果最后一个batch没有batch_size这么多sample,就扔掉最后一个

  • timeout -- 生成一个batch的timeout值

  • worker_init_fn -- 在每个worker启动时调用该函数,会传入一个值,该值是worker的index。

  • batch_sampler -- 当每次batch取出的数据数量不一致时,可以使用该sampler。batch_sampler每次iter应该输出一个list的index。 当batch_sampler不为None时,参数batch_size, sampler, drop_last会被忽略。

property dataset

获取正在参与iterate的dataset

返回

get_batch_indices()

获取最近输出的batch的index。用于溯源当前batch的数据

返回

static get_num_batches(num_samples, batch_size, drop_last)

计算batch的数量。用于前端显示进度

参数
  • num_samples (int) --

  • batch_size (int) --

  • drop_last (bool) -- 如果最后一个batch没有batch_size这么多,是否就丢掉。

返回

class fastNLP.core.batch.TorchLoaderIter(dataset, collate_fn, batch_size=1, sampler=None, num_workers=0, pin_memory=False, drop_last=False, timeout=0, worker_init_fn=None, batch_sampler=None)[源代码]

基类 fastNLP.BatchIter

别名 fastNLP.TorchLoaderIter fastNLP.core.batch.TorchLoaderIter

与DataSetIter类似,但可以用于非fastNLP的数据容器对象,以及可以实现完全自定义的生成batch的方式,然后与Trainer,Tester可以实现

与DataSetIter一样的对接。

需要保证传入的数据容器实现了实现了以下的方法

Example:

import random
from fastNLP import TorchLoaderIter
import torch
class UdfDataSet:
    def __init__(self, num_samples):
        self.num_samples = num_samples

    def __getitem__(self, idx):  # 必须实现的方法,输入参数是一个int,范围为[0, len(self))
        x = [random.random() for _ in range(3)]
        y = random.random()
        return x,y

    def __len__(self):  # 需要实现该方法返回值需要是一个int数据
        return self.num_samples

# 需要实现collact_fn将数据转换为tensor
def collate_fn(data_list):
    # [(x1,y1), (x2,y2), ...], 这里的输入实际上是将UdfDataSet的__getitem__输入结合为list
    xs, ys = [], []
    for l in data_list:
        x, y = l
        xs.append(x)
        ys.append(y)
    # 不需要转移到gpu,Trainer或Tester会将其转移到model所在的device
    x,y = torch.FloatTensor(xs), torch.FloatTensor(ys)
    return {'x':x, 'y':y}, {'y':y}  # 第一个dict中内容类似于DataSet中的input列,第二个dict的内容类似于target列

udf_dataset = UdfDataSet(10)
dataset = TorchLoaderIter(udf_dataset, collate_fn=collate_fn)
class Model(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc = nn.Linear(3, 1)
    def forward(self, x, y):
        return {'loss':torch.pow(self.fc(x).squeeze(-1)-y, 2).sum()}
    def predict(self, x):
        return {'pred':self.fc(x).squeeze(0)}
model = Model()
trainer = Trainer(train_data=dataset, model=model, loss=None, print_every=2, dev_data=dataset,
                  metrics=AccuracyMetric(target='y'), use_tqdm=False)
trainer.train(load_best_model=False)

除此之外,还可以通过该方法实现OnTheFly的训练,如下面的代码所示

Example:

import tempfile
import random
import torch
tmp_file_handler, tmp_file_path = tempfile.mkstemp(text=True)
try:
    num_samples, data = 10, []
    for _ in range(num_samples):
        x, y = [random.random() for _ in range(3)], random.random()
        data.append(x + [y])
    with open(tmp_file_path, 'w') as f:
        for d in data:
            f.write(' '.join(map(str, d)) + '\n')

    class FileDataSet:
        def __init__(self, tmp_file):
            num_samples = 0
            line_pos = [0]  # 对应idx是某一行对应的位置
            self.tmp_file_handler = open(tmp_file, 'r', encoding='utf-8')
            line = self.tmp_file_handler.readline()
            while line:
                if line.strip():
                    num_samples += 1
                    line_pos.append(self.tmp_file_handler.tell())
                line = self.tmp_file_handler.readline()
            self.tmp_file_handler.seek(0)
            self.num_samples = num_samples
            self.line_pos = line_pos

        def __getitem__(self, idx):
            line_start, line_end = self.line_pos[idx], self.line_pos[idx + 1]
            self.tmp_file_handler.seek(line_start)
            line = self.tmp_file_handler.read(line_end - line_start).strip()
            values = list(map(float, line.split()))
            x, y = values[:3], values[-1]
            return x, y

        def __len__(self):
            return self.num_samples

    def collate_fn(data_list):
        # [(x1,y1), (x2,y2), ...], 这里的输入实际上是将UdfDataSet的__getitem__输入结合为list
        xs, ys = [], []
        for l in data_list:
            x, y = l
            xs.append(x)
            ys.append(y)
        x, y = torch.FloatTensor(xs), torch.FloatTensor(ys)
        return {'x': x, 'y': y}, {'y': y}  # 第一个dict中内容类似于DataSet中的input列,第二个dict的内容类似于target列

    file_data = FileDataSet(tmp_file_path)
    dataset = TorchLoaderIter(file_data, collate_fn=collate_fn)

    class Model(nn.Module):
        def __init__(self):
            super().__init__()
            self.fc = nn.Linear(3, 1)

        def forward(self, x, y):
            return {'loss': torch.pow(self.fc(x).squeeze(-1) - y, 2).sum()}

        def predict(self, x):
            return {'pred': self.fc(x).squeeze(0)}

    model = Model()
    trainer = Trainer(train_data=dataset, model=model, loss=None, print_every=2, dev_data=dataset,
                      metrics=AccuracyMetric(target='y'), use_tqdm=False, n_epochs=2)
    trainer.train(load_best_model=False)

finally:
    import os
    if os.path.exists(tmp_file_path):
        os.remove(tmp_file_path)
__init__(dataset, collate_fn, batch_size=1, sampler=None, num_workers=0, pin_memory=False, drop_last=False, timeout=0, worker_init_fn=None, batch_sampler=None)[源代码]
参数
  • dataset -- 实现了__getitem__和__len__方法的数据容器。

  • collate_fn (callable) -- 用于将样本组合成batch的函数。输入为[dataset[idx1], dataset[idx2], ...], 即dataset中 __getitem__返回值组成的list,返回值必须为两个dict,其中第一个dict会被认为是input,第二个dict中的内容被认为是target。 需要转换为tensor的数据,需要在collate_fn中转化,但不需要转移到对应device。

  • batch_size (int) -- 取出的batch大小

  • sampler -- 规定使用的 Sampler 方式. 若为 None , 使用 SequentialSampler. Default: None

  • num_workers (int) -- 使用多少个进程来预处理数据

  • pin_memory (bool) -- 是否将产生的tensor使用pin memory, 可能会加快速度。

  • drop_last (bool) -- 如果最后一个batch没有batch_size这么多sample,就扔掉最后一个

  • timeout -- 生成一个batch的timeout值

  • worker_init_fn -- 在每个worker启动时调用该函数,会传入一个值,该值是worker的index。

  • batch_sampler -- 当每次batch取出的数据数量不一致时,可以使用该sampler。batch_sampler每次iter应该输出一个list的index。 当batch_sampler不为None时,参数batch_size, sampler, drop_last会被忽略。

property dataset

获取正在参与iterate的dataset

返回

get_batch_indices()

获取最近输出的batch的index。用于溯源当前batch的数据

返回

static get_num_batches(num_samples, batch_size, drop_last)

计算batch的数量。用于前端显示进度

参数
  • num_samples (int) --

  • batch_size (int) --

  • drop_last (bool) -- 如果最后一个batch没有batch_size这么多,是否就丢掉。

返回