一、WebSocket 协议基础

1.1 HTTP vs WebSocket

特性HTTP/HTTPSWebSocket
连接方式请求-响应全双工长连接
连接建立每次请求重新建立一次握手,持久连接
服务器推送不支持(需轮询/SSE)原生支持
协议头大小较大(每次包含)小(2-10 字节)
实时性

1.2 WebSocket 握手过程

1. 客户端发起HTTP升级请求
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13

2. 服务器响应升级
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

二、Spring Boot WebSocket 架构

2.1 整体架构图

┌─────────────────────────────────────────────────────┐
│                客户端 (Browser/Mobile)               │
└──────────────────────────┬──────────────────────────┘
                          │
                    WebSocket连接
                          │
┌─────────────────────────▼──────────────────────────┐
│             Spring Boot WebSocket Server           │
├────────────────────────────────────────────────────┤
│  ┌──────────────┐  ┌──────────────┐  ┌──────────┐ │
│  │   Endpoint   │  │  Handler     │  │Intercept│ │
│  │  注册器      │  │  处理器       │  │ 拦截器   │ │
│  └──────┬───────┘  └──────┬───────┘  └────┬─────┘ │
│         │                  │                │     │
│  ┌──────▼──────────────────▼─────┐  ┌─────▼─────┐ │
│  │      Message Broker           │  │  Session  │ │
│  │     (Simple/External)         │  │  管理器    │ │
│  └──────┬──────────────────┬─────┘  └───────────┘ │
│         │                  │                       │
│  ┌──────▼──────┐  ┌───────▼────────┐             │
│  │ SubProtocol │  │ 消息转换器      │             │
│  │ (STOMP)     │  │ (Converter)     │             │
│  └─────────────┘  └─────────────────┘             │
└────────────────────────────────────────────────────┘

2.2 Spring Boot WebSocket 核心组件

1. WebSocketHandler

public interface WebSocketHandler {
    // 建立连接后调用
    void afterConnectionEstablished(WebSocketSession session) throws Exception;

    // 处理接收到的消息
    void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception;

    // 处理传输错误
    void handleTransportError(WebSocketSession session, Throwable exception) throws Exception;

    // 连接关闭后调用
    void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception;

    // 是否处理消息分段
    boolean supportsPartialMessages();
}

2. WebSocketSession

public interface WebSocketSession {
    String getId();                    // 会话ID
    URI getUri();                      // 请求URI
    Map<String, Object> getAttributes(); // 会话属性
    Principal getPrincipal();          // 用户主体
    InetSocketAddress getLocalAddress(); // 本地地址
    InetSocketAddress getRemoteAddress(); // 远程地址
    void sendMessage(WebSocketMessage<?> message) throws IOException; // 发送消息
    void close() throws IOException;   // 关闭连接
    boolean isOpen();                  // 是否开启
}

三、STOMP 子协议实现

3.1 STOMP 消息帧格式

COMMAND
header1:value1
header2:value2

Body^@

3.2 Spring Boot STOMP 实现原理

// 核心组件关系
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    // 1. 消息代理配置
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // SimpleBroker: 内存消息代理
        // StompBrokerRelay: 外部消息代理(RabbitMQ, ActiveMQ)
    }

    // 2. 端点注册
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 注册WebSocket端点
    }
}

3.3 消息流转过程

客户端发送消息流程:
1. 客户端 -> CONNECT帧 -> WebSocket端点
2. 端点 -> 建立STOMP会话 -> SessionManager
3. 客户端 -> SEND帧 -> @Controller/@MessageMapping
4. 处理器 -> 返回消息 -> 消息代理
5. 消息代理 -> 路由到订阅者 -> 客户端

订阅消息流程:
1. 客户端 -> SUBSCRIBE帧 -> 消息代理
2. 消息代理 -> 记录订阅关系 -> SubscriptionRegistry
3. 有消息时 -> 查找订阅者 -> 发送消息

四、Spring Boot WebSocket 核心实现

4.1 自动配置原理

// Spring Boot 自动配置类
@AutoConfiguration
@ConditionalOnClass({WebSocketHandler.class, WebSocketMessageBrokerConfigurationSupport.class})
@ConditionalOnWebApplication(type = Type.SERVLET)
@EnableConfigurationProperties(WebSocketProperties.class)
public class WebSocketMessagingAutoConfiguration {

    // 注册WebSocket相关Bean
    @Bean
    @ConditionalOnMissingBean
    public WebSocketMessageBrokerConfigurationSupport webSocketMessageBrokerConfigurationSupport() {
        return new WebSocketMessageBrokerConfigurationSupport();
    }
}

4.2 消息处理链

// 核心处理器链
WebSocketHttpRequestHandler
    ↓
HandshakeHandler (DefaultHandshakeHandler)
    ↓
WebSocketService
    ↓
WebSocketHandlerDecorator
    ↓
SubProtocolWebSocketHandler
    ↓
StompSubProtocolHandler
    ↓
@MessageMapping 注解方法

4.3 Session 管理

// DefaultSimpUserRegistry
public class DefaultSimpUserRegistry implements SimpUserRegistry {
    // 用户会话映射
    private final ConcurrentMap<String, SimpUser> users = new ConcurrentHashMap<>();

    public SimpUser getUser(String userName) {
        return this.users.get(userName);
    }

    // 添加会话
    public void onApplicationEvent(SessionConnectEvent event) {
        String sessionId = event.getMessage().getHeaders().get("simpSessionId").toString();
        Principal user = event.getUser();
        // 注册用户和会话
    }
}

五、心跳机制实现

5.1 服务器端心跳

public class HeartbeatTask implements Runnable {
    private final WebSocketSession session;

    public void run() {
        while (session.isOpen()) {
            try {
                // 发送Ping消息
                session.sendMessage(new PingMessage());
                Thread.sleep(30000); // 30秒间隔
            } catch (Exception e) {
                break;
            }
        }
    }
}

5.2 客户端心跳检测

// 前端心跳检测
let heartbeatInterval = null;
let missedPings = 0;

function startHeartbeat() {
  heartbeatInterval = setInterval(() => {
    if (stompClient && stompClient.connected) {
      stompClient.send("/app/heartbeat", {}, "ping");
      missedPings++;

      if (missedPings > 3) {
        // 超过3次未响应,重连
        reconnect();
      }
    }
  }, 30000);
}

六、集群部署方案

6.1 使用外部消息代理

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 使用RabbitMQ作为外部消息代理
        registry.enableStompBrokerRelay("/topic", "/queue")
            .setRelayHost("rabbitmq-host")
            .setRelayPort(61613)
            .setClientLogin("guest")
            .setClientPasscode("guest")
            .setSystemLogin("guest")
            .setSystemPasscode("guest");

        registry.setApplicationDestinationPrefixes("/app");
    }
}

6.2 集群架构

        ┌─────────────────┐
        │    负载均衡器     │
        │   (Nginx/HAProxy)│
        └────────┬─────────┘
                 │
    ┌────────────┼────────────┐
    │            │            │
┌───▼──┐    ┌───▼──┐    ┌───▼──┐
│节点1  │    │节点2  │    │节点3  │
│Spring│    │Spring│    │Spring│
│Boot  │    │Boot  │    │Boot  │
└───┬──┘    └───┬──┘    └───┬──┘
    │            │            │
    └────────────┼────────────┘
                 │
          ┌─────▼─────┐
          │ 消息代理   │
          │ (Redis/   │
          │ RabbitMQ) │
          └───────────┘

七、消息路由机制

7.1 消息路由表

// 核心路由逻辑
public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {

    // 消息路由处理
    protected void handleMessageInternal(Message<?> message) {
        String destination = getDestination(message);

        if (destination.startsWith("/topic/")) {
            // 广播消息
            broadcast(message);
        } else if (destination.startsWith("/queue/")) {
            // 队列消息
            sendToQueue(destination, message);
        } else if (destination.startsWith("/user/")) {
            // 用户消息
            sendToUser(destination, message);
        }
    }
}

7.2 消息转换流程

原始消息 → MessageConverter → 目标对象 → @MessageMapping方法
      ↑                               ↓
消息代理 ← 返回对象 ← MessageConverter ← 方法返回值

八、安全机制

8.1 认证与授权

@Configuration
@EnableWebSocketSecurity
public class WebSocketSecurityConfig extends AbstractSecurityWebSocketMessageBrokerConfigurer {

    @Override
    protected void configureInbound(MessageSecurityMetadataSourceRegistry messages) {
        // 配置消息安全
        messages
            .simpDestMatchers("/app/**").authenticated()  // 应用消息需要认证
            .simpSubscribeDestMatchers("/user/queue/errors").permitAll()
            .simpSubscribeDestMatchers("/topic/**").hasRole("USER")
            .anyMessage().denyAll();
    }

    @Override
    protected boolean sameOriginDisabled() {
        return true;  // 禁用同源策略
    }
}

8.2 CSRF 防护

@Configuration
public class WebSocketCsrfConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myHandler(), "/myHandler")
            .setAllowedOrigins("*")
            .addInterceptors(new CsrfTokenHandshakeInterceptor());
    }
}

九、性能优化

9.1 连接池优化

# application.yml
server:
  tomcat:
    max-connections: 8192
    max-threads: 200
    min-spare-threads: 10
    accept-count: 100

spring:
  websocket:
    # 消息缓存大小
    send-buffer-size-limit: 512 * 1024
    # 消息时间限制
    send-time-limit: 10 * 1000

9.2 消息压缩

@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
    ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
    container.setMaxTextMessageBufferSize(8192);
    container.setMaxBinaryMessageBufferSize(8192);
    container.setMaxSessionIdleTimeout(600000L);

    // 启用压缩
    container.setAsyncSendTimeout(5000L);

    return container;
}

十、监控与调试

10.1 监控端点

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketMonitoringConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 启用监控端点
        registry.enableSimpleBroker("/topic", "/queue", "/monitoring");

        // 注册监控处理器
        SimpMessagingTemplate template = new SimpMessagingTemplate(brokerChannel);
        template.setMessageConverter(new MappingJackson2MessageConverter());

        // 定期发送监控数据
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        executor.scheduleAtFixedRate(() -> {
            Map<String, Object> stats = getWebSocketStats();
            template.convertAndSend("/monitoring/stats", stats);
        }, 0, 5, TimeUnit.SECONDS);
    }
}

10.2 调试日志

logging:
  level:
    org.springframework.web.socket: DEBUG
    org.springframework.messaging: DEBUG
    org.springframework.messaging.simp: TRACE
    org.springframework.messaging.simp.stomp: TRACE

十一、故障排除

11.1 常见问题及解决方案

问题可能原因解决方案
连接失败跨域问题配置 AllowedOrigins
消息丢失网络不稳定实现消息确认机制
内存泄漏连接未关闭实现心跳和超时机制
性能下降消息堆积使用外部消息代理
集群同步节点间不同步使用共享 Session

11.2 连接状态监控

@Component
public class WebSocketSessionMonitor {

    @EventListener
    public void handleSessionConnectEvent(SessionConnectEvent event) {
        String sessionId = (String) event.getMessage().getHeaders().get("simpSessionId");
        log.info("Session connected: {}", sessionId);
    }

    @EventListener
    public void handleSessionDisconnectEvent(SessionDisconnectEvent event) {
        String sessionId = event.getSessionId();
        log.info("Session disconnected: {}", sessionId);
    }
}

十二、总结

Spring Boot WebSocket 核心特点:

  1. 协议升级:基于 HTTP 握手升级为 WebSocket 协议
  1. 消息代理:支持简单内存代理和外部消息代理
  1. STOMP 协议:使用 STOMP 子协议简化消息格式
  1. 注解驱动:通过注解简化消息处理
  1. 集群支持:通过外部消息代理实现集群
  1. 安全集成:与 Spring Security 无缝集成
  1. 事件驱动:提供完整的事件监听机制

工作流程总结:

  1. 连接建立:HTTP 握手 → 升级为 WebSocket → 建立 STOMP 会话
  1. 消息处理:客户端发送 → 消息代理路由 → 处理器处理 → 响应返回
  1. 订阅发布:客户端订阅主题 → 消息代理管理订阅 → 发布消息 → 推送订阅者
  1. 连接管理:心跳检测 → 会话管理 → 异常处理 → 连接回收

最佳实践:

  1. 生产环境使用外部消息代理(RabbitMQ/Redis)
  1. 实现完整的心跳和重连机制
  1. 配置合理的连接和消息限制
  1. 集成监控和日志
  1. 实现消息持久化
  1. 考虑消息顺序和去重

通过理解这些原理,可以更好地使用和优化 Spring Boot WebSocket,构建高性能的实时应用。