作者
Cendok
始于
分类:MRS
Tags: [ MRS ]
单任务卷积递归神经网络CRNN
始于
分类:MRS
Tags: [ MRS ]
单任务卷积递归神经网络CRNN
MRS单任务卷积递归神经网络
CRNN
单任务实现
import os
import numpy as np
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import pandas as pd
import librosa
import torch
import torch.nn as nn
import torch.nn.functional as F
def generate_cqt_spectrogram(file_path, resample_rate=5, max_length=100):
y, sr = librosa.load(file_path, sr=None)
cqt = librosa.cqt(y, sr=sr, fmin=librosa.note_to_hz('C1'), n_bins=168, bins_per_octave=24)
cqt_amplitude = np.abs(cqt)
cqt_resampled = librosa.resample(cqt_amplitude, orig_sr=sr, target_sr=resample_rate, axis=1)
print("cqt_resampled shape:", cqt_resampled.shape)
# 调整长度
if cqt_resampled.shape[1] < max_length:
pad_width = max_length - cqt_resampled.shape[1]
cqt_resampled = np.pad(cqt_resampled, ((0, 0), (0, pad_width)), 'constant')
elif cqt_resampled.shape[1] > max_length:
cqt_resampled = cqt_resampled[:, :max_length]
return cqt_resampled
#输出一个spectrogram频谱图
class AudioDataset(Dataset):
def __init__(self, df, transform=None):
self.df = df
self.transform = transform
def __len__(self):
return len(self.df)
def __getitem__(self, idx):
audio_path = self.df.iloc[idx]['audio']
spectrogram = generate_cqt_spectrogram(audio_path)
spectrogram = np.expand_dims(spectrogram, axis=0)
print("CQT Spectrogram shape:", spectrogram.shape)#检查频谱图形状,确保为单通道
eps = 1e-10#避免除以零
spectrogram = (spectrogram - np.mean(spectrogram)) / (np.std(spectrogram) + eps)#z-score标准化
label = self.df.iloc[idx]['System']
return torch.from_numpy(spectrogram).float(),label
def custom_collate_fn(batch):
spectrograms, labels = zip(*batch)#分离频谱图和标签
spectrograms = [torch.Tensor(s) for s in spectrograms]
spectrograms_padded = torch.nn.utils.rnn.pad_sequence(spectrograms, batch_first=True, padding_value=0)
labels = torch.tensor(labels)
print("Batch shape:", spectrograms_padded.shape)#验证最终形状
return spectrograms_padded, labels
class CRNN(nn.Module):
def __init__(self, num_classes, input_height=168, input_width=100, rnn_hidden_size=128, rnn_num_layers=2):
super(CRNN, self).__init__()
self.input_height = input_height
self.input_width = input_width
#更新CNN层,以适应较窄的输入
self.conv1 = nn.Conv2d(in_channels=1, out_channels=64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
self.bn1 = nn.BatchNorm2d(64)
self.pool1 = nn.MaxPool2d(kernel_size=(2, 1), stride=(2, 1))#只在高度上池化,保持宽度不变
self.conv2 = nn.Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
self.bn2 = nn.BatchNorm2d(128)
self.pool2 = nn.MaxPool2d(kernel_size=(2, 1), stride=(2, 1))#同上
self.conv3 = nn.Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
self.bn3 = nn.BatchNorm2d(256)
self.pool3 = nn.MaxPool2d(kernel_size=(2, 1), stride=(2, 1))#同上
#确定展平后的尺
self._to_linear = None
self._forward_conv(torch.autograd.Variable(torch.rand(1, 1, input_height, input_width)))
#RNN层
self.lstm = nn.LSTM(input_size=self._to_linear, hidden_size=rnn_hidden_size, num_layers=rnn_num_layers, batch_first=True, bidirectional=True)
#分类器
self.fc = nn.Linear(rnn_hidden_size * 2, num_classes)#*2因为它是双向的
def _forward_conv(self, x):
x = self.pool1(F.relu(self.bn1(self.conv1(x))))
x = self.pool2(F.relu(self.bn2(self.conv2(x))))
x = self.pool3(F.relu(self.bn3(self.conv3(x))))
if self._to_linear is None:
self._to_linear = x.shape[1] * x.shape[2] * x.shape[3]#动态计算
return x
def forward(self, x):
#卷积层
x = self._forward_conv(x)
#为RNN输入重塑输出
batch_size = x.shape[0]
x = x.view(batch_size, -1, self._to_linear) # (batch, seq_len, features)
#RNN层
x, (h_n, c_n) = self.lstm(x)
#只使用最后一个RNN层的输出
x = self.fc(x[:, -1, :])
return x
#从这里开始处理数据
file_path = './label.csv'
# file_path = r"D:\0-2024英文文献\0-代码部分\Code\label.csv"
df = pd.read_csv(file_path, encoding='gbk')
audio_dir = os.path.dirname(file_path)
df = df[['File_Name', 'System', 'Tonic', 'Pattern']]
df['audio'] = df['File_Name'].apply(lambda x: os.path.join(audio_dir, 'CNPM_audio', x))
df = df[['audio', 'System', 'Tonic', 'Pattern']]
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)
train_df, val_df = train_test_split(train_df, test_size=0.25, random_state=42)#0.25 x 0.8 = 0.2
train_dataset = AudioDataset(train_df)
val_dataset = AudioDataset(val_df)
test_dataset = AudioDataset(test_df)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=custom_collate_fn)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=custom_collate_fn)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=custom_collate_fn)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CRNN(num_classes=12).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
def train_model(model, data_loader, criterion, optimizer, device):
model.train()
total_loss = 0
for inputs, labels in data_loader:
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
total_loss += loss.item()
return total_loss / len(data_loader)
def evaluate_model(model, data_loader, criterion, device):
model.eval()
total_loss = 0
total_correct = 0
total_samples = 0
with torch.no_grad():
for inputs, labels in data_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
loss = criterion(outputs, labels)
total_loss += loss.item()#计算准确度
_, predicted = torch.max(outputs.data, 1)#获取最大概率的预测结果
total_correct += (predicted == labels).sum().item()
total_samples += labels.size(0)
average_loss = total_loss / len(data_loader)
accuracy = total_correct / total_samples
return average_loss, accuracy
#实际训练和验证循环
epochs = 10
for epoch in range(epochs):
train_loss = train_model(model, train_loader, criterion, optimizer, device)
val_loss, val_accuracy = evaluate_model(model, val_loader, criterion, device)
print(f"Epoch {epoch+1}, Train Loss: {train_loss}, Validation Loss: {val_loss}")
print(f"System Validation Accuracy: {val_accuracy: .4f}")