MRS单任务卷积递归神经网络

CRNN

单任务实现

import os
import numpy as np
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import pandas as pd
import librosa
import torch
import torch.nn as nn
import torch.nn.functional as F

def generate_cqt_spectrogram(file_path, resample_rate=5, max_length=100):
    y, sr = librosa.load(file_path, sr=None)
    cqt = librosa.cqt(y, sr=sr, fmin=librosa.note_to_hz('C1'), n_bins=168, bins_per_octave=24)
    cqt_amplitude = np.abs(cqt)
    cqt_resampled = librosa.resample(cqt_amplitude, orig_sr=sr, target_sr=resample_rate, axis=1)
    print("cqt_resampled shape:", cqt_resampled.shape)
    # 调整长度
    if cqt_resampled.shape[1] < max_length:
        pad_width = max_length - cqt_resampled.shape[1]
        cqt_resampled = np.pad(cqt_resampled, ((0, 0), (0, pad_width)), 'constant')
    elif cqt_resampled.shape[1] > max_length:
        cqt_resampled = cqt_resampled[:, :max_length]

    return cqt_resampled
#输出一个spectrogram频谱图

class AudioDataset(Dataset):
    def __init__(self, df, transform=None):
        self.df = df
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        audio_path = self.df.iloc[idx]['audio']
        spectrogram = generate_cqt_spectrogram(audio_path)
        spectrogram = np.expand_dims(spectrogram, axis=0)
        print("CQT Spectrogram shape:", spectrogram.shape)#检查频谱图形状,确保为单通道
        eps = 1e-10#避免除以零
        spectrogram = (spectrogram - np.mean(spectrogram)) / (np.std(spectrogram) + eps)#z-score标准化
        label = self.df.iloc[idx]['System']
        return torch.from_numpy(spectrogram).float(),label

def custom_collate_fn(batch):
    spectrograms, labels = zip(*batch)#分离频谱图和标签
    spectrograms = [torch.Tensor(s) for s in spectrograms]
    spectrograms_padded = torch.nn.utils.rnn.pad_sequence(spectrograms, batch_first=True, padding_value=0)
    labels = torch.tensor(labels)
    print("Batch shape:", spectrograms_padded.shape)#验证最终形状
    return spectrograms_padded, labels


class CRNN(nn.Module):
    def __init__(self, num_classes, input_height=168, input_width=100, rnn_hidden_size=128, rnn_num_layers=2):
        super(CRNN, self).__init__()
        self.input_height = input_height
        self.input_width = input_width
        #更新CNN层,以适应较窄的输入
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        self.bn1 = nn.BatchNorm2d(64)
        self.pool1 = nn.MaxPool2d(kernel_size=(2, 1), stride=(2, 1))#只在高度上池化,保持宽度不变
        self.conv2 = nn.Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        self.bn2 = nn.BatchNorm2d(128)
        self.pool2 = nn.MaxPool2d(kernel_size=(2, 1), stride=(2, 1))#同上
        self.conv3 = nn.Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        self.bn3 = nn.BatchNorm2d(256)
        self.pool3 = nn.MaxPool2d(kernel_size=(2, 1), stride=(2, 1))#同上
        #确定展平后的尺
        self._to_linear = None
        self._forward_conv(torch.autograd.Variable(torch.rand(1, 1, input_height, input_width)))
        #RNN层
        self.lstm = nn.LSTM(input_size=self._to_linear, hidden_size=rnn_hidden_size, num_layers=rnn_num_layers, batch_first=True, bidirectional=True)
        #分类器
        self.fc = nn.Linear(rnn_hidden_size * 2, num_classes)#*2因为它是双向的

    def _forward_conv(self, x):
        x = self.pool1(F.relu(self.bn1(self.conv1(x))))
        x = self.pool2(F.relu(self.bn2(self.conv2(x))))
        x = self.pool3(F.relu(self.bn3(self.conv3(x))))
        if self._to_linear is None:
            self._to_linear = x.shape[1] * x.shape[2] * x.shape[3]#动态计算
        return x

    def forward(self, x):
        #卷积层
        x = self._forward_conv(x)
        #为RNN输入重塑输出
        batch_size = x.shape[0]
        x = x.view(batch_size, -1, self._to_linear)  # (batch, seq_len, features)
        #RNN层
        x, (h_n, c_n) = self.lstm(x)
        #只使用最后一个RNN层的输出
        x = self.fc(x[:, -1, :])
        return x

#从这里开始处理数据
file_path = './label.csv'
# file_path = r"D:\0-2024英文文献\0-代码部分\Code\label.csv"
df = pd.read_csv(file_path, encoding='gbk')
audio_dir = os.path.dirname(file_path)

df = df[['File_Name', 'System', 'Tonic', 'Pattern']]
df['audio'] = df['File_Name'].apply(lambda x: os.path.join(audio_dir, 'CNPM_audio', x))
df = df[['audio', 'System', 'Tonic', 'Pattern']]

train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)
train_df, val_df = train_test_split(train_df, test_size=0.25, random_state=42)#0.25 x 0.8 = 0.2

train_dataset = AudioDataset(train_df)
val_dataset = AudioDataset(val_df)
test_dataset = AudioDataset(test_df)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=custom_collate_fn)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=custom_collate_fn)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=custom_collate_fn)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CRNN(num_classes=12).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

def train_model(model, data_loader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    for inputs, labels in data_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(data_loader)

def evaluate_model(model, data_loader, criterion, device):
    model.eval()
    total_loss = 0
    total_correct = 0
    total_samples = 0
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            total_loss += loss.item()#计算准确度
            _, predicted = torch.max(outputs.data, 1)#获取最大概率的预测结果
            total_correct += (predicted == labels).sum().item()
            total_samples += labels.size(0)
        average_loss = total_loss / len(data_loader)
        accuracy = total_correct / total_samples
    return average_loss, accuracy

#实际训练和验证循环
epochs = 10
for epoch in range(epochs):
    train_loss = train_model(model, train_loader, criterion, optimizer, device)
    val_loss, val_accuracy = evaluate_model(model, val_loader, criterion, device)
    print(f"Epoch {epoch+1}, Train Loss: {train_loss}, Validation Loss: {val_loss}")
    print(f"System Validation Accuracy: {val_accuracy: .4f}")