前言
最近公司里遇到一个问题,在集群中一些websocket的消息丢失了。
产生问题的原理很简单,发送消息的服务和接收者连接的服务不是同一个服务。
解决方案
用中间件(mq, redis etc.)来在服务之间进行通信。
不直接发送websocket消息,而是将消息放在mq或者redis的list中。
并在redis中维护连接信息,服务根据连接信息来判断自己是否需要处理消息,或者将消息发给接收者连接的服务。
代码示例
我们的项目中使用的是Spring WebSocket,并且使用了STOMP协议,可以去官网查看文档。
代码示例只做维护连接信息的代码示例,因为其他部分都很容易想到。
维护连接信息的代码示例
想要在维护STOMP协议的连接信息,可以查看文档的这一部分Listening To ApplicationContext Events and Intercepting Messages
这里的连接信息只要是能够标识出不同的服务就OK。
一下是监听了订阅事件的Listener的部分代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package cn.fjhdtp.websocket.interceptor;
import java.util.Map;
import org.apache.commons.lang.StringUtils; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
public class LoginInfoInterceptor extends HttpSessionHandshakeInterceptor {
@Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
Object loginBean = ...; attributes.put(WebSocketConstant.WEBSOKET_LOGINBEAN,loginBean);
return super.beforeHandshake(request, response, wsHandler, attributes); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| package cn.fjhdtp.listener;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationListener; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.web.socket.messaging.SessionSubscribeEvent;
import java.util.Map;
@Component public class SessionSubscribeEventListener implements ApplicationListener<SessionSubscribeEvent> {
@Autowired @Qualifier("serversideMessageTaskExecutor") private ThreadPoolTaskExecutor threadPoolTaskExecutor; @Autowired private IMessageHandler messageHandler;
@Override public void onApplicationEvent(SessionSubscribeEvent event) { String destination = (String) event.getMessage().getHeaders().get("simpDestination"); Object loginBean = ((Map) event.getMessage().getHeaders().get("simpSessionAttributes")).get(WebSocketConstant.WEBSOKET_LOGINBEAN); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package cn.fjhdtp.message.listener;
import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; import org.springframework.web.socket.messaging.SessionDisconnectEvent;
import java.util.Map;
@Component public class SessionDisconnectEventListener implements ApplicationListener<SessionDisconnectEvent> {
@Override public void onApplicationEvent(SessionDisconnectEvent event) { Object loginBean = ((Map) event.getMessage().getHeaders().get("simpSessionAttributes")).get(WebSocketConstant.WEBSOKET_LOGINBEAN);
} }
|
当然,有些情况下可能不会正常的触发断开连接的事件(在was下就不会有这个事件),因此还会需要HeartBeat。