Skip to content

时序预测

约 687 字大约 2 分钟

CNNLSTMpython

2025-03-08

很久以前写的代码,那个时候transformer还没有火起来,现在万物皆可transformer了

使用CNN+LSTM,支持多特征,多步预测,K-折交叉验证

import torch.nn as nn
import torch
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
from torch.autograd import Variable
import os
import pandas as pd
from torchvision import transforms
import numpy as np
from sklearn.model_selection import KFold
#os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'



#全局参数
feature = ['feature1','feature2','feature3','feature4'] #特征列表,要预测的的feature放在第一个位置
hidden_size = 32  #隐藏层的维度
batch_size = 32  #批大小
sequence = 5  #序列长度
folds = 5  #K折交叉验证的折数
step_num = 100 #训练的步数
dataset_name = 'data.csv'  #数据集名称
prediction_name = 'predict.csv'  #预测数据的名称


#读取数据
df = pd.read_csv(dataset_name)
df = df.sort_index(ascending=True)
#print(df.head(5))
# 提取feature,并做标准化
df = df[feature]
min_max = dict.fromkeys(feature)
for i in feature:
    min_max[i] = [df[i].min(),df[i].max()]
df = df.apply(lambda x: (x - min(x)) / (max(x) - min(x)))
total_len = df.shape[0]
X = []
Y = []
for i in range(df.shape[0] - sequence):
    X.append(np.array(df.iloc[i:(i + sequence), ].values, dtype=np.float32))
    Y.append(np.array(df.iloc[(i + sequence), 0], dtype=np.float32))
class Mydataset(Dataset):

    def __init__(self, xx, yy, transform=None):
        self.x = xx
        self.y = yy
        self.tranform = transform

    def __getitem__(self, index):
        x1 = self.x[index]
        y1 = self.y[index]
        if self.tranform != None:
            return self.tranform(x1), y1
        return x1, y1

    def __len__(self):
        return len(self.x)
class CNN_LSTM(nn.Module):
    def __init__(self, args):
        super(CNN_LSTM, self).__init__()
        self.args = args
        self.relu = nn.ReLU(inplace=True)
        self.conv = nn.Sequential(
            nn.Conv1d(in_channels=args['in_channels'], out_channels=args['out_channels'], kernel_size=3),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=3, stride=1)
        )
        self.lstm = nn.LSTM(input_size=args['out_channels'], hidden_size=args['hidden_size'],
                            num_layers=args['num_layers'], batch_first=True)
        self.fc = nn.Linear(args['hidden_size'], args['output_size'])
    def forward(self, x):
        x = x.permute(0, 2, 1)
        x = self.conv(x)
        x = x.permute(0, 2, 1)
        x, _ = self.lstm(x)
        x = self.fc(x)
        x = x[:, -1, :]
        return x
kf = KFold(n_splits=folds)
folenum = 0
for train_index, test_index in kf.split(X):
    #model = lstm(len(feature),hidden_size,1)
    model = CNN_LSTM(args=dict(in_channels=len(feature), out_channels=hidden_size, hidden_size=hidden_size, num_layers=1, output_size=1))
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    trainx, trainy = np.array(X)[train_index], np.array(Y)[train_index]
    testx, testy = np.array(X)[test_index], np.array(Y)[test_index]
    train_loader = DataLoader(dataset=Mydataset(trainx, trainy, transform=transforms.ToTensor()), batch_size=batch_size,
                              shuffle=True)
    test_loader = DataLoader(dataset=Mydataset(testx, testy), batch_size=batch_size, shuffle=True)
    preds = []
    labels = []
    loss123 = []
    for i in range(step_num):
        total_loss = 0
        for idx, (data, label) in enumerate(train_loader):
            data1 = data.squeeze(1)
            pred = model(Variable(data1))
            label = label.unsqueeze(1)
            loss = criterion(pred, label)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            loss123.append(total_loss)
    import matplotlib.pyplot as plt

    plt.plot(loss123, "r", label="loss")
    plt.title('loss')
    plt.show()
    preds = []
    labels = []
    for idx, (x, label) in enumerate(test_loader):
        x = x.squeeze(1)  # batch_size,seq_len,input_size
        pred = model(x)
        preds.extend(pred.data.squeeze(1).tolist())
        labels.extend(label.tolist())
    plt.plot([ele * (list(min_max.values())[0][1] - list(min_max.values())[0][0]) + list(min_max.values())[0][0] for ele in preds], "r", label="pred")
    plt.plot([ele * (list(min_max.values())[0][1] - list(min_max.values())[0][0]) + list(min_max.values())[0][0] for ele in labels], "b", label="real")
    plt.title('prediction')
    plt.show()
    torch.save(model.state_dict(), (f'model-fold{folenum+1}.pt'))
    folenum += 1


# 开始测试
df2 = pd.read_csv(prediction_name)
df1 = df2.tail(sequence)
df1 = df1.sort_index(ascending=True)
df1 = df1[feature]
for i in feature:
    df1[i] = (df1[i] - min_max[i][0]) / (min_max[i][1] - min_max[i][0])
x = torch.tensor(np.array(df1))
x = x.to(torch.float32)
x = x.unsqueeze(0)
result = []
for folenum in range(folds):
    model.load_state_dict(torch.load(f'model-fold{folenum+1}.pt'))
    result.append(model(x).data.squeeze(1).tolist()[0] * (list(min_max.values())[0][1] - list(min_max.values())[0][0]) + list(min_max.values())[0][0])

print("预测结果:" + str(sum(result) / len(result)))

xqy2006's blog