Table of Contents

使用多层感知机(MLP)拟合y = x^4 + 3x^3 + 10x + 114514

代码

代码中包含了:实验设置,准备数据集,定义模型,和训练代码。

将训练集、验证集和测试集按照7:1:2的比例划分,使用Huber损失函数和AdamW优化器,学习率为0.005,进行100个epochs的训练。

评价指标为平均绝对误差(MAE)和均方根误差(RMSE)。

import random

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler

'''
Config
'''
train_ratio, val_ratio, test_ratio = 0.7, 0.1, 0.2
seed = 3407
input_dim, hidden_dim, output_dim = 3, 8, 3
num_epochs = 100
learning_rate = 0.005
weight_decay = 1e-4
batch_size = 256
use_cuda = False

'''
Prepare dataset.
'''

def func_to_fit(x):
    return x ** 4 + 3 * (x ** 3) + 10 * x + 114514

class XYDataset(Dataset):
    def __init__(self, input_tensors, target_tensors):
        assert input_tensors.shape[-1] == target_tensors.shape[-1]
        self.input_tensors = [input_tensors[:, idx].squeeze(-1) for idx in range(input_tensors.shape[-1])]
        self.target_tensors = [target_tensors[:, idx].squeeze(-1) for idx in range(target_tensors.shape[-1])]

    def __len__(self):
        return len(self.input_tensors)

    def __getitem__(self, idx):
        return self.input_tensors[idx], self.target_tensors[idx]

input_tensors = torch.randn((3, int(2e5)))  # 假设每条输入有3个特征
if use_cuda and torch.cuda.is_available():
    input_tensors = input_tensors.cuda()
target_tensors = func_to_fit(input_tensors)
dataset = XYDataset(input_tensors, target_tensors)

'''
Introduce a MLP to fit the function 'func_to_fit'.
'''

class MLP(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, hidden_dim, bias=True),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim, bias=True),
            nn.ReLU(),
            nn.Linear(hidden_dim, output_dim, bias=True),
        )

    def forward(self, x):
        return self.model(x)

'''
Pipeline
'''
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
indices = list(range(len(dataset)))
train_indices, _temp = train_test_split(indices, test_size=test_ratio + val_ratio, random_state=seed)
val_indices, test_indices = train_test_split(_temp, test_size=test_ratio / (test_ratio + val_ratio),
                                             random_state=seed)
train_sampler = SubsetRandomSampler(train_indices)
val_sampler = SubsetRandomSampler(val_indices)
test_sampler = SubsetRandomSampler(test_indices)

train_loader = DataLoader(dataset, batch_size=batch_size, sampler=train_sampler)
val_loader = DataLoader(dataset, batch_size=batch_size, sampler=val_sampler)
test_loader = DataLoader(dataset, batch_size=batch_size, sampler=test_sampler)

def main():
    model = MLP(input_dim, hidden_dim, output_dim)
    if use_cuda and torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        model = model.cuda()
    criterion = nn.HuberLoss(delta=2.0)
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

    best_epoch, min_val_loss = -1, float('inf')
    best_model_pth = './best_model.pth'

    for epoch in range(num_epochs):
        train_losses, val_losses = [], []
        model.train()
        for i, (x, y) in enumerate(train_loader):
            if use_cuda and torch.cuda.is_available():
                x, y = x.cuda(), y.cuda()
            hat_y = model(x)
            loss = criterion(hat_y, y)
            train_losses.append(loss.item())
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        model.eval()
        with torch.no_grad():
            for i, (x, y) in enumerate(val_loader):
                if use_cuda and torch.cuda.is_available():
                    x, y = x.cuda(), y.cuda()
                hat_y = model(x)
                loss = criterion(hat_y, y)
                val_losses.append(loss.item())
        train_loss, val_loss = np.mean(np.array(train_losses)), np.mean(np.array(val_losses))
        print(f'Epoch [{epoch + 1}/{num_epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')
        if val_loss < min_val_loss:
            torch.save(model.state_dict(), best_model_pth)
            print(f'Val Loss decreased from {min_val_loss:.4f} to {val_loss:.4f}, saved model to \'{best_model_pth}\'.')
            min_val_loss = val_loss
            best_epoch = epoch

    test_MAE, test_RMSE = [], []

    model.load_state_dict(torch.load(best_model_pth))
    print(f'Loaded model at epoch [{best_epoch + 1}/{num_epochs}].')

    model.eval()
    with torch.no_grad():
        for x, y in test_loader:
            if use_cuda and torch.cuda.is_available():
                x, y = x.cuda(), y.cuda()
            hat_y = model(x)
            test_MAE.append(F.l1_loss(hat_y, y).item())
            test_RMSE.append((F.mse_loss(hat_y, y) ** 0.5).item())

    print(f'Test Loss : MAE = {np.mean(np.array(test_MAE)):.4f}, RMSE = {np.mean(np.array(test_RMSE)):.4f}')

if __name__ == '__main__':
    main()

实验记录

Epoch [1/100], Train Loss: 223800.9959, Val Loss: 206843.0257
Val Loss decreased from inf to 206843.0257, saved model to './best_model.pth'.
Epoch [2/100], Train Loss: 148187.2498, Val Loss: 73566.3437
Val Loss decreased from 206843.0257 to 73566.3437, saved model to './best_model.pth'.
Epoch [3/100], Train Loss: 40582.0299, Val Loss: 24481.6588
Val Loss decreased from 73566.3437 to 24481.6588, saved model to './best_model.pth'.
...
Epoch [34/100], Train Loss: 25.8258, Val Loss: 20.6515
Val Loss decreased from 23.2906 to 20.6515, saved model to './best_model.pth'.
Epoch [35/100], Train Loss: 23.9975, Val Loss: 23.1034
...
Epoch [51/100], Train Loss: 20.7278, Val Loss: 12.9897
Val Loss decreased from 15.5006 to 12.9897, saved model to './best_model.pth'.
...
Epoch [75/100], Train Loss: 18.5427, Val Loss: 9.4123
Val Loss decreased from 10.7232 to 9.4123, saved model to './best_model.pth'.
...
Epoch [100/100], Train Loss: 19.6701, Val Loss: 20.9654
Loaded model at epoch [75/100].
Test Loss : MAE = 6.0065, RMSE = 31.6396
最后更新于 2025-01-27