fastNLP.core.batch¶
batch 模块实现了 fastNLP 所需的 DataSetIter
类。
-
class
fastNLP.core.batch.
BatchIter
(dataset, batch_size=1, sampler=None, num_workers=0, pin_memory=False, drop_last=False, timeout=0, worker_init_fn=None, collate_fn=None, batch_sampler=None)[源代码]¶ 别名
fastNLP.BatchIter
fastNLP.core.batch.BatchIter
Trainer用于迭代数据的类。继承该类,并实现get_num_batches(), get_batch_indices(), num_batches(), __iter__()方法以及dataset属性。
-
static
get_num_batches
(num_samples, batch_size, drop_last)[源代码]¶ 计算batch的数量。用于前端显示进度
- 参数
num_samples (int) --
batch_size (int) --
drop_last (bool) -- 如果最后一个batch没有batch_size这么多,是否就丢掉。
- 返回
-
property
dataset
¶ 获取正在参与iterate的dataset
- 返回
-
static
-
class
fastNLP.core.batch.
DataSetIter
(dataset, batch_size=1, sampler=None, as_numpy=False, num_workers=0, pin_memory=False, drop_last=False, timeout=0, worker_init_fn=None, batch_sampler=None)[源代码]¶ -
别名
fastNLP.DataSetIter
fastNLP.core.batch.DataSetIter
- DataSetIter 用于从 DataSet 中按一定的顺序, 依次按
batch_size
的大小将数据取出,通过使用DataSetIter,可以不需要考虑 输入的padding(由DataSet中每列的Padder决定了)以及不需要考虑将数据转为tensor。
组成 x 和 y:
batch = DataSetIter(data_set, batch_size=16, sampler=SequentialSampler()) num_batch = len(batch) for batch_x, batch_y in batch: # do stuff ...
-
__init__
(dataset, batch_size=1, sampler=None, as_numpy=False, num_workers=0, pin_memory=False, drop_last=False, timeout=0, worker_init_fn=None, batch_sampler=None)[源代码]¶ - 参数
dataset --
DataSet
对象, 数据集batch_size (int) -- 取出的batch大小
sampler --
规定使用的
Sampler
方式. 若为None
, 使用SequentialSampler
.Default:
None
as_numpy (bool) --
若为
True
, 输出batch为 numpy.array. 否则为torch.Tensor
.Default:
False
num_workers (int) -- 使用多少个进程来预处理数据
pin_memory (bool) -- 是否将产生的tensor使用pin memory, 可能会加快速度。
drop_last (bool) -- 如果最后一个batch没有batch_size这么多sample,就扔掉最后一个
timeout -- 生成一个batch的timeout值
worker_init_fn -- 在每个worker启动时调用该函数,会传入一个值,该值是worker的index。
batch_sampler -- 当每次batch取出的数据数量不一致时,可以使用该sampler。batch_sampler每次iter应该输出一个list的index。 当batch_sampler不为None时,参数batch_size, sampler, drop_last会被忽略。
-
property
dataset
¶ 获取正在参与iterate的dataset
- 返回
-
get_batch_indices
()¶ 获取最近输出的batch的index。用于溯源当前batch的数据
- 返回
-
static
get_num_batches
(num_samples, batch_size, drop_last)¶ 计算batch的数量。用于前端显示进度
- 参数
num_samples (int) --
batch_size (int) --
drop_last (bool) -- 如果最后一个batch没有batch_size这么多,是否就丢掉。
- 返回
- DataSetIter 用于从 DataSet 中按一定的顺序, 依次按
-
class
fastNLP.core.batch.
TorchLoaderIter
(dataset, collate_fn, batch_size=1, sampler=None, num_workers=0, pin_memory=False, drop_last=False, timeout=0, worker_init_fn=None, batch_sampler=None)[源代码]¶ -
别名
fastNLP.TorchLoaderIter
fastNLP.core.batch.TorchLoaderIter
- 与DataSetIter类似,但可以用于非fastNLP的数据容器对象,以及可以实现完全自定义的生成batch的方式,然后与Trainer,Tester可以实现
与DataSetIter一样的对接。
需要保证传入的数据容器实现了实现了以下的方法
Example:
import random from fastNLP import TorchLoaderIter import torch class UdfDataSet: def __init__(self, num_samples): self.num_samples = num_samples def __getitem__(self, idx): # 必须实现的方法,输入参数是一个int,范围为[0, len(self)) x = [random.random() for _ in range(3)] y = random.random() return x,y def __len__(self): # 需要实现该方法返回值需要是一个int数据 return self.num_samples # 需要实现collact_fn将数据转换为tensor def collate_fn(data_list): # [(x1,y1), (x2,y2), ...], 这里的输入实际上是将UdfDataSet的__getitem__输入结合为list xs, ys = [], [] for l in data_list: x, y = l xs.append(x) ys.append(y) # 不需要转移到gpu,Trainer或Tester会将其转移到model所在的device x,y = torch.FloatTensor(xs), torch.FloatTensor(ys) return {'x':x, 'y':y}, {'y':y} # 第一个dict中内容类似于DataSet中的input列,第二个dict的内容类似于target列 udf_dataset = UdfDataSet(10) dataset = TorchLoaderIter(udf_dataset, collate_fn=collate_fn) class Model(nn.Module): def __init__(self): super().__init__() self.fc = nn.Linear(3, 1) def forward(self, x, y): return {'loss':torch.pow(self.fc(x).squeeze(-1)-y, 2).sum()} def predict(self, x): return {'pred':self.fc(x).squeeze(0)} model = Model() trainer = Trainer(train_data=dataset, model=model, loss=None, print_every=2, dev_data=dataset, metrics=AccuracyMetric(target='y'), use_tqdm=False) trainer.train(load_best_model=False)
除此之外,还可以通过该方法实现OnTheFly的训练,如下面的代码所示
Example:
import tempfile import random import torch tmp_file_handler, tmp_file_path = tempfile.mkstemp(text=True) try: num_samples, data = 10, [] for _ in range(num_samples): x, y = [random.random() for _ in range(3)], random.random() data.append(x + [y]) with open(tmp_file_path, 'w') as f: for d in data: f.write(' '.join(map(str, d)) + '\n') class FileDataSet: def __init__(self, tmp_file): num_samples = 0 line_pos = [0] # 对应idx是某一行对应的位置 self.tmp_file_handler = open(tmp_file, 'r', encoding='utf-8') line = self.tmp_file_handler.readline() while line: if line.strip(): num_samples += 1 line_pos.append(self.tmp_file_handler.tell()) line = self.tmp_file_handler.readline() self.tmp_file_handler.seek(0) self.num_samples = num_samples self.line_pos = line_pos def __getitem__(self, idx): line_start, line_end = self.line_pos[idx], self.line_pos[idx + 1] self.tmp_file_handler.seek(line_start) line = self.tmp_file_handler.read(line_end - line_start).strip() values = list(map(float, line.split())) x, y = values[:3], values[-1] return x, y def __len__(self): return self.num_samples def collate_fn(data_list): # [(x1,y1), (x2,y2), ...], 这里的输入实际上是将UdfDataSet的__getitem__输入结合为list xs, ys = [], [] for l in data_list: x, y = l xs.append(x) ys.append(y) x, y = torch.FloatTensor(xs), torch.FloatTensor(ys) return {'x': x, 'y': y}, {'y': y} # 第一个dict中内容类似于DataSet中的input列,第二个dict的内容类似于target列 file_data = FileDataSet(tmp_file_path) dataset = TorchLoaderIter(file_data, collate_fn=collate_fn) class Model(nn.Module): def __init__(self): super().__init__() self.fc = nn.Linear(3, 1) def forward(self, x, y): return {'loss': torch.pow(self.fc(x).squeeze(-1) - y, 2).sum()} def predict(self, x): return {'pred': self.fc(x).squeeze(0)} model = Model() trainer = Trainer(train_data=dataset, model=model, loss=None, print_every=2, dev_data=dataset, metrics=AccuracyMetric(target='y'), use_tqdm=False, n_epochs=2) trainer.train(load_best_model=False) finally: import os if os.path.exists(tmp_file_path): os.remove(tmp_file_path)
-
__init__
(dataset, collate_fn, batch_size=1, sampler=None, num_workers=0, pin_memory=False, drop_last=False, timeout=0, worker_init_fn=None, batch_sampler=None)[源代码]¶ - 参数
dataset -- 实现了__getitem__和__len__方法的数据容器。
collate_fn (callable) -- 用于将样本组合成batch的函数。输入为[dataset[idx1], dataset[idx2], ...], 即dataset中 __getitem__返回值组成的list,返回值必须为两个dict,其中第一个dict会被认为是input,第二个dict中的内容被认为是target。 需要转换为tensor的数据,需要在collate_fn中转化,但不需要转移到对应device。
batch_size (int) -- 取出的batch大小
sampler -- 规定使用的
Sampler
方式. 若为None
, 使用SequentialSampler
. Default:None
num_workers (int) -- 使用多少个进程来预处理数据
pin_memory (bool) -- 是否将产生的tensor使用pin memory, 可能会加快速度。
drop_last (bool) -- 如果最后一个batch没有batch_size这么多sample,就扔掉最后一个
timeout -- 生成一个batch的timeout值
worker_init_fn -- 在每个worker启动时调用该函数,会传入一个值,该值是worker的index。
batch_sampler -- 当每次batch取出的数据数量不一致时,可以使用该sampler。batch_sampler每次iter应该输出一个list的index。 当batch_sampler不为None时,参数batch_size, sampler, drop_last会被忽略。
-
property
dataset
¶ 获取正在参与iterate的dataset
- 返回
-
get_batch_indices
()¶ 获取最近输出的batch的index。用于溯源当前batch的数据
- 返回
-
static
get_num_batches
(num_samples, batch_size, drop_last)¶ 计算batch的数量。用于前端显示进度
- 参数
num_samples (int) --
batch_size (int) --
drop_last (bool) -- 如果最后一个batch没有batch_size这么多,是否就丢掉。
- 返回