主题
字号
CHAPTER 05 ≈ 30 MIN READ

CNN 模型构建与迁移学习

经典LeNet模型实现

LeNet是1998年Yann LeCun提出的第一个卷积神经网络,是现代CNN的开山之作。

import torch.nn as nn

class LeNet(nn.Module):
    def __init__(self, num_classes=10):
        super().__init__()

        # 特征提取部分
        self.features = nn.Sequential(
            # C1: 第一卷积层
            nn.Conv2d(1, 6, kernel_size=5, padding=2),  # 28x28 -> 28x28
            nn.ReLU(inplace=True),
            nn.AvgPool2d(kernel_size=2, stride=2),      # 28x28 -> 14x14

            # C3: 第二卷积层
            nn.Conv2d(6, 16, kernel_size=5),            # 14x14 -> 10x10
            nn.ReLU(inplace=True),
            nn.AvgPool2d(kernel_size=2, stride=2),      # 10x10 -> 5x5
        )

        # 分类器部分
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(16 * 5 * 5, 120),
            nn.ReLU(inplace=True),
            nn.Linear(120, 84),
            nn.ReLU(inplace=True),
            nn.Linear(84, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

# 测试
model = LeNet(num_classes=10)
x = torch.randn(1, 1, 28, 28)
output = model(x)
print("输出形状:", output.shape)  # torch.Size([1, 10])

# 打印模型结构
print(model)

LeNet架构图

输入 (1, 28, 28)
    ↓
Conv2d(1, 6, 5x5) + ReLU + AvgPool  →  (6, 14, 14)
    ↓
Conv2d(6, 16, 5x5) + ReLU + AvgPool  →  (16, 5, 5)
    ↓
Flatten  →  (400)
    ↓
Linear(400, 120) + ReLU  →  (120)
    ↓
Linear(120, 84) + ReLU  →  (84)
    ↓
Linear(84, 10)  →  (10)

AlexNet模型实现

AlexNet在2012年ImageNet竞赛中取得突破性成绩,标志着深度学习的复兴。

class AlexNet(nn.Module):
    def __init__(self, num_classes=1000):
        super().__init__()

        self.features = nn.Sequential(
            # Conv1
            nn.Conv2d(3, 96, kernel_size=11, stride=4, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),  # 224 -> 55

            # Conv2
            nn.Conv2d(96, 256, kernel_size=5, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),  # 55 -> 27

            # Conv3
            nn.Conv2d(256, 384, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),

            # Conv4
            nn.Conv2d(384, 384, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),

            # Conv5
            nn.Conv2d(384, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),  # 27 -> 13
        )

        self.avgpool = nn.AdaptiveAvgPool2d((6, 6))

        self.classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(256 * 6 * 6, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Linear(4096, num_classes),
        )

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

# 测试
model = AlexNet(num_classes=1000)
x = torch.randn(1, 3, 224, 224)
output = model(x)
print("输出形状:", output.shape)  # torch.Size([1, 1000])

# 参数统计
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"总参数: {total_params:,}")
print(f"可训练参数: {trainable_params:,}")

VGG模型实现

VGG通过使用更小的3x3卷积核堆叠来增加网络深度,是当时最常用的特征提取网络。

class VGG(nn.Module):
    def __init__(self, num_classes=1000):
        super().__init__()

        # VGG16配置
        # [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M',
        #  512, 512, 512, 'M', 512, 512, 512, 'M']
        # 'M' = MaxPool

        self.features = self._make_layers([
            64, 64, 'M',           # Block 1: 224 -> 112
            128, 128, 'M',         # Block 2: 112 -> 56
            256, 256, 256, 'M',    # Block 3: 56 -> 28
            512, 512, 512, 'M',    # Block 4: 28 -> 14
            512, 512, 512, 'M',    # Block 5: 14 -> 7
        ])

        self.avgpool = nn.AdaptiveAvgPool2d((7, 7))

        self.classifier = nn.Sequential(
            nn.Linear(512 * 7 * 7, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(4097, num_classes),
        )

    def _make_layers(self, config):
        layers = []
        in_channels = 3

        for v in config:
            if v == 'M':
                layers.append(nn.MaxPool2d(kernel_size=2, stride=2))
            else:
                layers.append(nn.Conv2d(in_channels, v, kernel_size=3, padding=1))
                layers.append(nn.BatchNorm2d(v))
                layers.append(nn.ReLU(inplace=True))
                in_channels = v

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

# 测试
model = VGG(num_classes=1000)
x = torch.randn(1, 3, 224, 224)
output = model(x)
print("输出形状:", output.shape)

# 参数统计
total_params = sum(p.numel() for p in model.parameters())
print(f"VGG16参数: {total_params:,}")

VGG变体

# VGG11, VGG13, VGG16, VGG19
# 数字表示卷积层+全连接层的总层数
# 常用VGG16(在准确率和参数量之间平衡较好)

ResNet残差网络

ResNet通过残差连接解决了深层网络梯度消失问题,是现代CNN的里程碑。

class ResidualBlock(nn.Module):
    """残差块"""
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super().__init__()
        self.conv1 = nn.Conv2d(
            in_channels, out_channels,
            kernel_size=3, stride=stride, padding=1, bias=False
        )
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(
            out_channels, out_channels,
            kernel_size=3, stride=1, padding=1, bias=False
        )
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity  # 残差连接
        out = self.relu(out)

        return out


class ResNet(nn.Module):
    def __init__(self, num_classes=1000):
        super().__init__()

        # 初始卷积层
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # 残差层
        self.layer1 = self._make_layer(64, 64, blocks=2, stride=1)
        self.layer2 = self._make_layer(64, 128, blocks=2, stride=2)
        self.layer3 = self._make_layer(128, 256, blocks=2, stride=2)
        self.layer4 = self._make_layer(256, 512, blocks=2, stride=2)

        # 全局平均池化和分类器
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)

    def _make_layer(self, in_channels, out_channels, blocks, stride):
        downsample = None

        if stride != 1 or in_channels != out_channels:
            downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels),
            )

        layers = []
        layers.append(ResidualBlock(in_channels, out_channels, stride, downsample))

        for _ in range(1, blocks):
            layers.append(ResidualBlock(out_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x


# 测试
model = ResNet(num_classes=1000)
x = torch.randn(1, 3, 224, 224)
output = model(x)
print("输出形状:", output.shape)

# ResNet18参数统计
total_params = sum(p.numel() for p in model.parameters())
print(f"ResNet参数: {total_params:,}")

ResNet的优势

普通网络: 输出 = F(x)
残差网络: 输出 = F(x) + x

残差连接使得梯度可以直接反向传播到浅层
解决了深层网络梯度消失的问题

torchvision.models:预训练模型

PyTorch提供了在ImageNet上预训练的模型,可以直接下载使用。

import torchvision.models as models

# 加载预训练模型
resnet18 = models.resnet18(pretrained=True)
resnet50 = models.resnet50(pretrained=True)
vgg16 = models.vgg16(pretrained=True)
alexnet = models.alexnet(pretrained=True)

# 加载未训练的模型(用于微调)
resnet18_no_pretrain = models.resnet18(pretrained=False)

# 查看模型结构
print(resnet18)

常用预训练模型

# ImageNet预训练模型
models.resnet18(pretrained=True)    # 11.7M参数
models.resnet34(pretrained=True)   # 21.8M参数
models.resnet50(pretrained=True)   # 25.6M参数
models.resnet101(pretrained=True)  # 44.5M参数

models.vgg11(pretrained=True)       # 132.9M参数
models.vgg13(pretrained=True)
models.vgg16(pretrained=True)      # 138.4M参数

models.alexnet(pretrained=True)    # 61.1M参数

# EfficientNet系列(最新最强)
models.efficientnet_b0(pretrained=True)
models.efficientnet_b1(pretrained=True)

# MobileNet系列(移动端优化)
models.mobilenet_v2(pretrained=True)
models.mobilenet_v3_small(pretrained=True)

# ViT(Vision Transformer)
models.vit_b_16(pretrained=True)    # 需要较大计算资源

特征提取:冻结主干网络

特征提取是最常用的迁移学习方法,将预训练模型作为固定特征提取器。

import torchvision.models as models

# 加载预训练模型
model = models.resnet18(pretrained=True)

# 冻结所有层(不更新参数)
for param in model.parameters():
    param.requires_grad = False

# 修改最后的全连接层
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, 10)  # 10类分类

# 训练时只有fc层的参数会更新
# 其他层参数固定不变

特征提取的完整示例

import torchvision.models as models
import torch.nn as nn

# 1. 加载预训练模型
feature_extractor = models.resnet18(pretrained=True)

# 2. 冻结所有参数
for param in feature_extractor.parameters():
    param.requires_grad = False

# 3. 替换分类头
feature_extractor.fc = nn.Sequential(
    nn.Dropout(0.3),
    nn.Linear(512, 256),
    nn.ReLU(),
    nn.Dropout(0.3),
    nn.Linear(256, 10)
)

# 4. 冻结BatchNorm(可选,防止更新running stats)
for module in feature_extractor.modules():
    if isinstance(module, nn.BatchNorm2d):
        module.eval()

# 5. 训练
optimizer = torch.optim.Adam(feature_extractor.fc.parameters(), lr=0.001)

# 训练循环
for images, labels in train_loader:
    outputs = feature_extractor(images)
    loss = criterion(outputs, labels)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

微调:解冻部分层

微调(Fine-tuning)是在预训练模型基础上解冻部分层进行训练。

# 方法1:解冻所有层
model = models.resnet18(pretrained=True)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# 方法2:只解冻最后几层
model = models.resnet18(pretrained=True)

# 冻结前面的层
for name, param in model.named_parameters():
    if 'layer4' not in name and 'fc' not in name:
        param.requires_grad = False

# 只优化可训练参数
optimizer = torch.optim.Adam(
    filter(lambda p: p.requires_grad, model.parameters()),
    lr=0.001
)

# 方法3:使用不同的学习率
model = models.resnet18(pretrained=True)

# 基础层(浅层)使用较小学习率
base_params = []
base_layers = ['conv1', 'bn1', 'layer1', 'layer2']
for name, param in model.named_parameters():
    if any(layer in name for layer in base_layers):
        base_params.append(param)
        param.requires_grad = True
    else:
        param.requires_grad = False

optimizer = torch.optim.Adam([
    {'params': base_params, 'lr': 1e-4},      # 基础层:低学习率
    {'params': model.fc.parameters(), 'lr': 1e-3}  # 分类头:高学习率
])

模型保存与加载

import torch

# 方法1:只保存参数(推荐)
torch.save(model.state_dict(), 'model.pth')

# 加载
model = MyModel()
model.load_state_dict(torch.load('model.pth'))

# 方法2:保存完整模型
torch.save(model, 'model_full.pth')

# 加载
model = torch.load('model_full.pth')

# 方法3:保存检查点(训练中断恢复)
checkpoint = {
    'epoch': 10,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': 0.5,
}
torch.save(checkpoint, 'checkpoint.pth')

# 加载检查点
checkpoint = torch.load('checkpoint.pth')
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
start_epoch = checkpoint['epoch'] + 1

# 方法4:GPU到CPU的模型加载
device = torch.device('cpu')
model = MyModel()
model.load_state_dict(torch.load('model.pth', map_location=device))

# 方法5:跨设备加载(GPU/CPU兼容)
model = MyModel()
model.load_state_dict(torch.load('model.pth', map_location='cuda:0' if torch.cuda.is_available() else 'cpu'))