SortDataset

SortDataset 是一个用于解决排序问题的 PyTorch 数据集类。排序问题是一类基本的计算问题,它的目标是将一组数字按照一定的顺序(通常是从小到大或从大到小)排列。尽管排序问题在计算机科学中是一个经典问题,但在机器学习深度学习的上下文中,训练模型来解决排序问题仍然是一个有趣和有挑战性的任务。

SortDataset 的设计目标是为排序问题提供一个适合训练深度学习模型的数据集。它生成一组随机的整数序列作为输入,然后生成相应的排序序列作为目标输出。这种设计使得 SortDataset 可以用于训练各种类型的模型,包括但不限于循环神经网络RNN)、长短期记忆网络LSTM)和 Transformer 网络。

SortDataset 的意义在于,它提供了一个简单但有效的方式来生成用于训练和测试排序模型的数据。通过使用 SortDataset,研究人员和开发人员可以专注于模型的设计和优化,而不需要花费大量的时间和精力来处理数据生成和处理的问题。此外,由于 SortDataset 生成的数据是随机的,因此它可以生成几乎无限量的训练和测试数据,这对于训练深度学习模型来说是非常重要的。

代码实现

import pickle

class SortDataset(Dataset):
    """
    排序问题的数据集,比如长度为 6 下的问题
    Input: 0 0 2 1 0 1 -> Output: 0 0 0 1 1 2
    这将被连接到transformer中,形成如下输入:
    input:  0 0 2 1 0 1 0 0 0 1 1
    output: I I I I I 0 0 0 1 1 2
    其中I表示“忽略”,因为transformer正在读取输入序列。
    问题:这里的 I 忽略是什么意思?
    """

    def __init__(self, split, length=6, num_digits=3):
	    """
	    split 指定了数据集的划分('train' 或 'test')
	    length 指定了问题的长度
	    num_digits 参数指定了数字的数量
	    """
        assert split in {'train', 'test'}
        self.split = split
        self.length = length
        self.num_digits = num_digits
    
    def __len__(self):
	    """
	    返回数据集的长度
	    """
        return 10000 # ...
    
    def get_vocab_size(self):
	    """
	    返回词汇表的大小,即数字的数量
	    比如传入3,总共有数字 0,1,2
	    会生成三个嵌入向量,来表示这三个数字
	    """
        return self.num_digits
    
    def get_block_size(self):
	    """
	    传入 transformer 模型的序列长度
	    序列将输入与输出拼接到一起
	    但是-1是因为,transformer 从最后一个输入元素
	    开始进行预测
	    """
        return self.length * 2 - 1

    def __getitem__(self, idx):
	    """
	    接收一个索引 idx, 并返回该索引对应的数据项
	    首先使用拒绝采样从所需的划分中生成一个输入示例
	    然后,它解决了排序任务,即对输入进行排序。
	    最后,它将问题的规范和解决方案连接起来
	    并返回给 transformer 的输入和输出。
	    """
        
        # 使用拒绝采样从所需的划分中生成一个输入示例
        while True:
            # 生成一组随机数
            inp = torch.randint(
		            self.num_digits, 
		            size=(self.length,), 
		            dtype=torch.long)
			# 以一定的概率(在这里是50%)
			# 检查生成的输入序列 `inp` 中的唯一元素的数量
			# 是否大于序列长度的一半,并在满足条件时重新生成输入序列。
            if torch.rand(1).item() < 0.5:
                if inp.unique().nelement() > self.length // 2:
                    # 重复元素太少了,重新采样
                    continue
            # inp.tolist()将张量转为Python列表
            # pickle.dumps将python对象序列化为字符串
            # hash返回字符串对应的hash值
            h = hash(pickle.dumps(inp.tolist()))
            # 任意整数被 4 整除的概率是 1/4
            # 大约 25% 的输入序列被分到测试集
            # 剩下的 75% 被分到训练集
            inp_split = 'test' if h % 4 == 0 else 'train' 
            # 检查 `inp_split` 是否等于 `self.split`
            # `self.split` 是在创建 `SortDataset` 对象时指定的
            # 它表示这个 `SortDataset` 对象应该包含训练集的数据还是测试集的数据。
            # 如果 `inp_split` 等于 `self.split`
            # 那么这个输入序列 `inp` 就是我们想要的,所以跳出循环;否则,继续循环,生成新的输入序列。
            if inp_split == self.split:
                break # ok
        
        # 对生成的序列进行排序
        # 第一个元素是排序后的张量
        sol = torch.sort(inp)[0]

        # 将问题和答案拼接到一起
        cat = torch.cat((inp, sol), dim=0)

        # 生成 Transformer 模型的输入和输出序列
        # 并确保只在输出位置进行预测。
        # 这是因为在这个排序问题中,我们的目标是预测排序后的序列,而不是原始的输入序列。
        # 我们需要在输入位置屏蔽掉损失,以确保模型只在输出位置进行预测。
        # 它是 `cat` 张量的前 `n-1` 个元素的副本,
        # 这个新的张量 `x` 将作为 Transformer 模型的输入。
        x = cat[:-1].clone()
        # 它是 `cat` 张量的后 `n-1` 个元素的副本。
        # 这个新的张量 `y` 将作为 Transformer 模型的目标输出。
        y = cat[1:].clone()
        # 将 `y` 张量的前 `self.length-1` 个元素设置为 -1。
        # 只想在输出位置进行预测,而不是在输入位置。
        # 设置为 -1 的元素在计算损失时将被忽略,这样就可以避免在输入位置的损失对模型训练产生影响。
        
        # 如果 `cat` 是 `[1, 3, 2, 1, 2, 3]`
        # `x` 就会是 `[1, 3, 2, 1, 2]`
        # `y` 就会是 `[3, 2, 1, 2, 3]`。
        y[:self.length-1] = -1
        # `y` 就会是 `[-1, -1, -1, -1, 3]`。
        return x, y

相关主题:

GPT

创建训练集和测试集

# print an example instance of the dataset
train_dataset = SortDataset('train')
test_dataset = SortDataset('test')
x, y = train_dataset[0]
for a, b in zip(x,y):
    print(int(a),int(b))

打印效果(训练集举例):

1 -1
0 -1
1 -1
0 -1
0 -1
0 0
0 0
0 0
0 0
0 1
1 1

创建 GPT 模型

from mingpt.model import GPT

model_config = GPT.get_default_config()
model_config.model_type = 'gpt-nano'
# 数字的个数
model_config.vocab_size = train_dataset.get_vocab_size()
# 序列的个数
model_config.block_size = train_dataset.get_block_size()
model = GPT(model_config)

GPT 模型内部会打印出参数量:

number of parameters: 0.09M

创建训练器

# create a Trainer object
from mingpt.trainer import Trainer

train_config = Trainer.get_default_config()
train_config.learning_rate = 5e-4 # the model we're using is so small that we can go a bit faster
train_config.max_iters = 2000
train_config.num_workers = 0
trainer = Trainer(train_config, model, train_dataset)

执行训练

def batch_end_callback(trainer):
    if trainer.iter_num % 100 == 0:
        print(f"iter_dt {trainer.iter_dt * 1000:.2f}ms; iter {trainer.iter_num}: train loss {trainer.loss.item():.5f}")

trainer.set_callback('on_batch_end', batch_end_callback)
trainer.run()

再每一个 batch 训练完成后,进行一次打印:

iter_dt 0.00ms; iter 0: train loss 1.06449
iter_dt 139.15ms; iter 100: train loss 0.12883
iter_dt 64.77ms; iter 200: train loss 0.05456
iter_dt 95.64ms; iter 300: train loss 0.05620
iter_dt 94.22ms; iter 400: train loss 0.03233
iter_dt 105.66ms; iter 500: train loss 0.02521
iter_dt 69.31ms; iter 600: train loss 0.01017
iter_dt 62.40ms; iter 700: train loss 0.04061
iter_dt 97.62ms; iter 800: train loss 0.00331
iter_dt 80.93ms; iter 900: train loss 0.01802
iter_dt 77.46ms; iter 1000: train loss 0.00207
iter_dt 98.57ms; iter 1100: train loss 0.00667
iter_dt 126.75ms; iter 1200: train loss 0.01013
iter_dt 129.00ms; iter 1300: train loss 0.01998
iter_dt 81.17ms; iter 1400: train loss 0.01196
iter_dt 106.60ms; iter 1500: train loss 0.00717
iter_dt 95.69ms; iter 1600: train loss 0.02379
iter_dt 81.23ms; iter 1700: train loss 0.00058
iter_dt 96.82ms; iter 1800: train loss 0.00134
iter_dt 98.10ms; iter 1900: train loss 0.01036

这里训练的,应该是根据传入的序列,预测序列的最后一个值。

将模型切换为评估模式

# now let's perform some evaluation
model.eval()

对测试集进行推理

这段代码的主要目的是评估模型在训练集和测试集上的性能。它通过eval_split函数在给定的数据集上运行模型,并比较模型的输出与真实的目标序列。如果模型的输出与目标序列不匹配,它会记录并打印错误。最后,它计算并返回模型在所有样本上的正确率。

# 定义一个函数eval_split,接受三个参数:
#     - trainer(训练器)
#     - split(数据集分割类型)
#     - max_batches(最大批次数)
def eval_split(trainer, split, max_batches):
    # 根据split参数选择训练集或测试集
    dataset = {
	    'train':train_dataset, 
	    'test':test_dataset
	}[split]
    # 获取训练集的长度
    n = train_dataset.length 
    # 初始化一个空列表,用于存储结果
    results = []
    # 初始化一个变量,用于记录已经打印的错误数量
    mistakes_printed_already = 0
    # 创建一个数据加载器,设置批次大小为100,工作进程数为0,不丢弃最后一个不完整的批次
    loader = DataLoader(dataset, batch_size=100, num_workers=0, drop_last=False)
    # 对数据加载器中的每个批次进行遍历
    for b, (x, y) in enumerate(loader):
        # 将输入和目标数据转移到训练器所在的设备
        x = x.to(trainer.device)
        y = y.to(trainer.device)
        # 提取输入模式
        inp = x[:, :n]
        # 提取目标序列
        sol = y[:, -n:]
        # 让模型生成剩余的序列
        cat = model.generate(inp, n, do_sample=False) 
        # 提取生成的序列
        sol_candidate = cat[:, n:] 
        # 将预测的序列与真实的序列进行比较
        correct = (sol == sol_candidate).all(1).cpu() 
        # 遍历每个样本
        for i in range(x.size(0)):
            # 将比较结果添加到结果列表中
            results.append(int(correct[i]))
            # 如果预测错误,并且已打印的错误数少于3个,则打印错误信息
            if not correct[i] and mistakes_printed_already < 3: 
                mistakes_printed_already += 1
                print("GPT claims that %s sorted is %s but gt is %s" % (inp[i].tolist(), sol_candidate[i].tolist(), sol[i].tolist()))
        # 如果已经达到最大批次数,则跳出循环
        if max_batches is not None and b+1 >= max_batches:
            break
    # 将结果列表转换为张量
    rt = torch.tensor(results, dtype=torch.float)
    # 打印最终的评分
    print("%s final score: %d/%d = %.2f%% correct" % (split, rt.sum(), len(results), 100*rt.mean()))
    # 返回总分
    return rt.sum()

# 在不需要计算梯度的情况下运行以下代码
with torch.no_grad():
    # 计算训练集的评分
    train_score = eval_split(trainer, 'train', max_batches=50)
    # 计算测试集的评分
    test_score  = eval_split(trainer, 'test',  max_batches=50)

单 Case 测试

# let's run a random given sequence through the model as well
n = train_dataset.length # naugy direct access shrug
inp = torch.tensor(
		[[0, 0, 2, 1, 0, 1]], 
		dtype=torch.long
	).to(trainer.device)
	
assert inp[0].nelement() == n
with torch.no_grad():
    cat = model.generate(inp, n, do_sample=False)

sol = torch.sort(inp[0])[0]
sol_candidate = cat[:, n:]

print('input sequence  :', inp.tolist())
print('predicted sorted:', sol_candidate.tolist())
print('gt sort         :', sol.tolist())
print('matches         :', bool((sol == sol_candidate).all()))

打印结果:

input sequence  : [[0, 0, 2, 1, 0, 1]]
predicted sorted: [[0, 0, 0, 1, 1, 2]]
gt sort         :  [0, 0, 0, 1, 1, 2]
matches         : True

示例

其它示例

以下是一个使用 SortDataset 类的示例:

# 创建一个训练数据集,问题长度为 6,数字数量为 3
dataset = SortDataset('train', length=6, num_digits=3)

# 获取数据集的长度
print(len(dataset)) # 输出:10000

# 获取数据集的词汇表大小
print(dataset.get_vocab_size()) # 输出:3

# 获取数据集的块大小
print(dataset.get_block_size()) # 输出:11

# 获取索引为 0 的数据项
x, y = dataset[0]
# 输出:一个长度为 11 的张量
# tensor([1, 2, 0, 2, 1, 1, 0, 1, 1, 1, 2])
print(x) 
# 输出:一个长度为 11 的张量,其中前 5 个元素为 -1
# tensor([-1, -1, -1, -1, -1, 0, 1, 1, 1, 2, 2])
print(y) 

以上就是关于 PyTorch 的 SortDataset 类的学习笔记,希望对你有所帮助。

词汇表的大小

自然语言处理NLP)中,词汇表通常指的是我们在处理的文本数据中所有不同的单词的集合。例如,如果我们有一个包含 "cat", "dog", "fish" 的文本数据,那么这个文本的词汇表就是 {"cat", "dog", "fish"}。

在这个 SortDataset 数据集中,我们并不是处理文本数据,而是处理数字。所以这里的"词汇表"实际上就是所有可能的数字的集合。例如,如果 num_digits 是 3,那么我们的"词汇表"就是 {0, 1, 2},因为这就是我们在数据集中可能遇到的所有不同的数字。

这就是为什么我们说"词汇表的大小,即数字的数量"。在这个特定的数据集中,我们的"词汇"实际上就是数字,所以词汇表的大小就是我们有多少个不同的数字。

现在,让我们来谈谈 Transformer模型。Transformer 是一种深度学习模型,它在处理序列数据,特别是在 NLP 领域,有着非常好的表现。Transformer 的一个关键特性是它能够处理输入序列中的每个元素,而不需要像 RNN 或 LSTM 那样按照特定的顺序处理它们。这使得 Transformer 能够更好地处理长序列,并且能够更好地捕捉序列中的长距离依赖关系。

在 Transformer 模型中,我们通常会将输入序列的每个元素(在 NLP 中,这通常是一个单词;在 SortDataset 中,这是一个数字)转换为一个向量,这个向量就是这个元素的"嵌入"。这个嵌入向量是通过学习得到的,它能够捕捉到元素的一些重要特性。例如,在 NLP 中,相似的单词通常会有相似的嵌入向量。

这就是为什么我们需要知道词汇表的大小。我们需要为词汇表中的每个元素(在这个例子中,就是每个数字)学习一个嵌入向量。所以,词汇表的大小就决定了我们需要学习多少个嵌入向量。


本文作者:Maeiee

本文链接:SortDataset

版权声明:如无特别声明,本文即为原创文章,版权归 Maeiee 所有,未经允许不得转载!


喜欢我文章的朋友请随缘打赏,鼓励我创作更多更好的作品!