๐ WebSocket
Sprout์ WebSocket ๊ตฌํ์ RFC 6455 ์ฌ์์ ๋ฐ๋ผ ๋จ์ผ TCP ์ฐ๊ฒฐ์ ํตํ ์ ์ด์ค ํต์ ์ฑ๋์ ์ ๊ณตํฉ๋๋ค. NIO ๊ธฐ๋ฐ ์๋ฒ ์ํคํ ์ฒ์ ์๋ฒฝํ๊ฒ ํตํฉ๋์ด ๊ณ ์ฑ๋ฅ์ ํ์ฅ ๊ฐ๋ฅํ ์ค์๊ฐ ๋ฉ์์ง์ ์ ๊ณตํฉ๋๋ค.
๊ฐ์โ
Sprout์ WebSocket ๊ตฌํ์ ๋ค์์ ์ ๊ณตํฉ๋๋ค:
- RFC 6455 ์ค์: WebSocket ํ๋กํ ์ฝ ์ฌ์ ์์ ์ค์
- NIO ๊ธฐ๋ฐ ์ํคํ ์ฒ: ์ต๋ ํ์ฅ์ฑ์ ์ํ ๋ ผ๋ธ๋กํน I/O
- ์คํธ๋ฆฌ๋ฐ ํ๋ ์ ์ฒ๋ฆฌ: ๋ฉ๋ชจ๋ฆฌ ํจ์จ์ ์ธ ๋์ฉ๋ ๋ฉ์์ง ์ฒ๋ฆฌ
- ์ด๋ ธํ ์ด์ ๊ธฐ๋ฐ ํ๋ก๊ทธ๋๋ฐ: ์ ์ธ์ WebSocket ์๋ํฌ์ธํธ ์ ์
- ๋ผ์ดํ์ฌ์ดํด ๊ด๋ฆฌ: OnOpen, OnClose, OnError, ๋ฉ์์ง ํธ๋ค๋ง ์ฝ๋ฐฑ
- ๊ฒฝ๋ก ๋ณ์ ์ง์:
/ws/{userId}
์ ๊ฐ์ ๋์ ๊ฒฝ๋ก ํ๋ผ๋ฏธํฐ - ์ธ์ ๊ด๋ฆฌ: ์ค๋ ๋ ์์ ํ WebSocket ์ธ์ ์ถ์
WebSocket ์ํคํ ์ฒโ
ํต์ฌ ์ปดํฌ๋ํธโ
WebSocket ๊ตฌํ์ ๋ค์์ ์ฃผ์ ์ปดํฌ๋ํธ๋ก ๊ตฌ์ฑ๋ฉ๋๋ค:
WebSocketProtocolHandler
: ํ๋กํ ์ฝ ๊ฐ์ง ๋ฐ ์ฐ๊ฒฐ ์ ๊ทธ๋ ์ด๋WebSocketHandshakeHandler
: RFC 6455 ํธ๋์ ฐ์ดํฌ ํ์WebSocketFrameParser
: ๋ง์คํน ์ง์์ ํฌํจํ ์คํธ๋ฆฌ๋ฐ ํ๋ ์ ํ์WebSocketFrameEncoder
: ์๋ฒ์์ ํด๋ผ์ด์ธํธ๋ก์ ๋ฉ์์ง๋ฅผ ์ํ ํ๋ ์ ์ธ์ฝ๋ฉWebSocketSession
: ์ฐ๊ฒฐ ๋ผ์ดํ์ฌ์ดํด ๋ฐ ๋ฉ์์ง ์ ์กWebSocketContainer
: ์ธ์ ๋ ์ง์คํธ๋ฆฌ ๋ฐ ๊ด๋ฆฌWebSocketEndpointRegistry
: ์๋ํฌ์ธํธ ๋งคํ ๋ฐ ํธ๋ค๋ฌ ๋ฑ๋ก
ํ๋กํ ์ฝ ๊ฐ์ง ํ๋ฆโ
// 1. NIO Selector๊ฐ ์๋ก์ด ์ฐ๊ฒฐ ๊ฐ์ง
@Component
public class WebSocketProtocolDetector implements ProtocolDetector {
@Override
public String detect(ByteBuffer buffer) throws Exception {
buffer.mark();
// HTTP ์์ฒญ ๋ผ์ธ ์ฝ๊ธฐ
byte[] bytes = new byte[Math.min(buffer.remaining(), 512)];
buffer.get(bytes);
buffer.reset();
String content = new String(bytes, StandardCharsets.UTF_8);
// WebSocket ์
๊ทธ๋ ์ด๋ ํค๋ ํ์ธ
if (content.contains("Upgrade: websocket") ||
content.contains("Upgrade: WebSocket")) {
return "WEBSOCKET";
}
return "UNKNOWN";
}
}
WebSocket ํธ๋์ ฐ์ดํฌ (RFC 6455 Section 4)โ
ํธ๋์ ฐ์ดํฌ ํ๋ก์ธ์คโ
ํธ๋์ ฐ์ดํฌ ๊ตฌํ์ RFC 6455๋ฅผ ์๊ฒฉํ ๋ฐ๋ฆ ๋๋ค:
@Component
public class DefaultWebSocketHandshakeHandler implements WebSocketHandshakeHandler {
private static final String WEBSOCKET_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
@Override
public boolean performHandshake(HttpRequest<?> request, BufferedWriter out)
throws IOException {
// 1. ํ์ ํค๋ ๊ฒ์ฆ (RFC 6455 Section 4.2.1)
Map<String, String> headers = request.getHeaders();
String upgradeHeader = headers.get("Upgrade");
String connectionHeader = headers.get("Connection");
String secWebSocketKey = headers.get("Sec-WebSocket-Key");
String secWebSocketVersion = headers.get("Sec-WebSocket-Version");
// RFC 6455: ๋ฒ์ ์ ๋ฐ๋์ 13์ด์ด์ผ ํจ
if (!"websocket".equalsIgnoreCase(upgradeHeader) ||
!"Upgrade".equalsIgnoreCase(connectionHeader) ||
secWebSocketKey == null || secWebSocketKey.isBlank() ||
!"13".equals(secWebSocketVersion)) {
sendHandshakeErrorResponse(out, 400, "Bad Request",
"Invalid WebSocket handshake request headers.");
return false;
}
// 2. Sec-WebSocket-Accept ๊ฐ ๊ณ์ฐ (RFC 6455 Section 4.2.2)
String secWebSocketAccept = generateSecWebSocketAccept(secWebSocketKey);
// 3. 101 Switching Protocols ์๋ต ์ ์ก
out.write("HTTP/1.1 101 Switching Protocols\r\n");
out.write("Upgrade: websocket\r\n");
out.write("Connection: Upgrade\r\n");
out.write("Sec-WebSocket-Accept: " + secWebSocketAccept + "\r\n");
out.write("\r\n");
out.flush();
return true;
}
// RFC 6455 Section 4.2.2: Sec-WebSocket-Accept ๊ณ์ฐ
private String generateSecWebSocketAccept(String secWebSocketKey)
throws NoSuchAlgorithmException {
String combined = secWebSocketKey + WEBSOCKET_GUID;
MessageDigest sha1 = MessageDigest.getInstance("SHA-1");
byte[] sha1Hash = sha1.digest(combined.getBytes(StandardCharsets.US_ASCII));
return Base64.getEncoder().encodeToString(sha1Hash);
}
}
RFC 6455 ์ค์:
- โ Section 4.1: ํด๋ผ์ด์ธํธ ํธ๋์ ฐ์ดํฌ ์๊ตฌ์ฌํญ ๊ฒ์ฆ
- โ Section 4.2: ์ ์ ํ ํค๋๋ฅผ ํฌํจํ ์๋ฒ ํธ๋์ ฐ์ดํฌ ์๋ต
- โ Section 4.2.2: SHA-1๊ณผ Base64๋ฅผ ์ฌ์ฉํ Sec-WebSocket-Accept ๊ณ์ฐ
ํ๋ ์ ๊ตฌ์กฐ (RFC 6455 Section 5)โ
ํ๋ ์ ํ์โ
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| | Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
์คํธ๋ฆฌ๋ฐ ํ๋ ์ ํ์โ
ํ์๋ ์ ์ฒด ํ์ด๋ก๋๋ฅผ ๋ฉ๋ชจ๋ฆฌ์ ๋ก๋ํ์ง ์๊ธฐ ์ํด ์คํธ๋ฆฌ๋ฐ์ ์ฌ์ฉํฉ๋๋ค:
@Component
public class DefaultWebSocketFrameParser implements WebSocketFrameParser {
@Override
public WebSocketFrame parse(InputStream in) throws Exception {
// 1. ์ฒ์ ๋ ๋ฐ์ดํธ ์ฝ๊ธฐ (RFC 6455 Section 5.2)
int b1 = in.read();
int b2 = in.read();
if (b1 == -1 || b2 == -1) {
throw new RuntimeException("Unexpected end of stream");
}
// 2. FIN๊ณผ opcode ํ์ฑ
boolean fin = (b1 & 0x80) != 0; // FIN ๋นํธ
int opcode = b1 & 0x0F; // Opcode (4๋นํธ)
// 3. MASK์ ํ์ด๋ก๋ ๊ธธ์ด ํ์ฑ
boolean masked = (b2 & 0x80) != 0;
int payloadLen = b2 & 0x7F;
// 4. ํ์ฅ ํ์ด๋ก๋ ๊ธธ์ด (RFC 6455 Section 5.2)
long actualPayloadLen;
if (payloadLen == 126) {
// 16๋นํธ ํ์ฅ ๊ธธ์ด
actualPayloadLen = ((in.read() & 0xFF) << 8) | (in.read() & 0xFF);
} else if (payloadLen == 127) {
// 64๋นํธ ํ์ฅ ๊ธธ์ด
actualPayloadLen = 0;
for (int i = 0; i < 8; i++) {
actualPayloadLen = (actualPayloadLen << 8) | (in.read() & 0xFF);
}
} else {
actualPayloadLen = payloadLen;
}
// 5. ๋ง์คํน ํค๊ฐ ์์ผ๋ฉด ์ฝ๊ธฐ (RFC 6455 Section 5.3)
byte[] maskingKey = new byte[4];
if (masked) {
if (in.read(maskingKey) != 4) {
throw new IOException("Failed to read masking key");
}
}
// 6. ์คํธ๋ฆฌ๋ฐ ํ์ด๋ก๋ ์
๋ ฅ ์์ฑ (์ ์ฒด ํ์ด๋ก๋ ๋ก๋ ๋ฐฉ์ง)
InputStream payloadInputStream = new LimitedInputStream(in, actualPayloadLen);
if (masked) {
// ๋ง์คํน ์๊ณ ๋ฆฌ์ฆ ์ ์ฉ (RFC 6455 Section 5.3)
payloadInputStream = new MaskingInputStream(payloadInputStream, maskingKey);
}
return new WebSocketFrame(fin, opcode, payloadInputStream);
}
}
RFC 6455 ์ค์:
- โ Section 5.2: FIN, RSV, opcode, MASK, ํ์ด๋ก๋ ๊ธธ์ด๋ฅผ ํฌํจํ ์ ์ ํ ๊ธฐ๋ณธ ํ๋ ์ ํ์ฑ
- โ Section 5.2: 16๋นํธ(126) ๋ฐ 64๋นํธ(127) ๊ธธ์ด๋ฅผ ์ํ ํ์ฅ ํ์ด๋ก๋ ๊ธธ์ด
- โ Section 5.3: ํด๋ผ์ด์ธํธ์์ ์๋ฒ๋ก์ ๋ง์คํน ์๊ตฌ์ฌํญ ๊ฐ์
- โ Section 5.5: ์ ์ด ํ๋ ์ ํ์ด๋ก๋ ๊ธธ์ด ์ ํ (โค 125๋ฐ์ดํธ)
๋ง์คํน ์๊ณ ๋ฆฌ์ฆ (RFC 6455 Section 5.3)โ
public class MaskingInputStream extends FilterInputStream {
private final byte[] maskingKey;
private long bytesRead;
@Override
public int read() throws IOException {
int r = super.read();
if (r != -1) {
// RFC 6455 Section 5.3: transformed-octet-i = original-octet-i XOR masking-key-octet-j
// where j = i MOD 4
int k = maskingKey[(int) (bytesRead & 3)] & 0xFF;
r = (r ^ k) & 0xFF;
bytesRead++;
}
return r;
}
}
RFC 6455 ์ค์:
- โ Section 5.3: 4๋ฐ์ดํธ ๋ง์คํน ํค๋ฅผ ์ฌ์ฉํ XOR ๋ง์คํน ์๊ณ ๋ฆฌ์ฆ
- โ Section 5.1: ํด ๋ผ์ด์ธํธ ํ๋ ์์ ๋ฐ๋์ ๋ง์คํน๋์ด์ผ ํ๋ฉฐ; ์๋ฒ ํ๋ ์์ ๋ง์คํน๋์ด์๋ ์ ๋จ
ํ๋ ์ ์ธ์ฝ๋ฉ (RFC 6455 Section 5)โ
ํ ์คํธ ํ๋ ์ ์ธ์ฝ๋ฉโ
@Component
public class DefaultWebSocketFrameEncoder implements WebSocketFrameEncoder {
@Override
public byte[] encodeText(String message) {
byte[] payload = message.getBytes(StandardCharsets.UTF_8);
int payloadLen = payload.length;
ByteArrayOutputStream frameStream = new ByteArrayOutputStream();
// ์ฒซ ๋ฒ์งธ ๋ฐ์ดํธ: FIN + opcode (ํ
์คํธ = 0x1)
frameStream.write(0x81); // FIN=1, RSV=000, Opcode=0001
// ๋ ๋ฒ์งธ ๋ฐ์ดํธ ๋ฐ ํ์ฅ ๊ธธ์ด
if (payloadLen < 126) {
frameStream.write((byte) payloadLen);
} else if (payloadLen <= 65535) {
frameStream.write(126);
frameStream.write((payloadLen >> 8) & 0xFF);
frameStream.write(payloadLen & 0xFF);
} else {
frameStream.write(127);
for (int i = 0; i < 8; i++) {
frameStream.write((byte) ((long)payloadLen >> (8 * (7 - i)) & 0xFF));
}
}
frameStream.write(payload);
return frameStream.toByteArray();
}
}
RFC 6455 ์ค์:
- โ Section 5.2: FIN, opcode, ํ์ด๋ก๋ ๊ธธ์ด๋ฅผ ํฌํจํ ์ ์ ํ ํ๋ ์ ๊ตฌ์กฐ
- โ Section 5.6: ํ ์คํธ ํ๋ ์์ UTF-8 ์ธ์ฝ๋ฉ๊ณผ ํจ๊ป opcode 0x1 ์ฌ์ฉ
- โ Section 5.7: ๋ฐ์ด๋๋ฆฌ ํ๋ ์์ opcode 0x2 ์ฌ์ฉ
์ ์ด ํ๋ ์ ์ธ์ฝ๋ฉโ
@Override
public byte[] encodeControlFrame(int opcode, byte[] payload) {
// RFC 6455 Section 5.5: ์ ์ด ํ๋ ์์ ํ์ด๋ก๋๊ฐ 125๋ฐ์ดํธ ์ดํ์ฌ์ผ ํจ
if (payload.length > 125) {
throw new IllegalArgumentException(
"Control frame payload too big (must be <= 125)"
);
}
byte[] frame = new byte[2 + payload.length];
frame[0] = (byte) (0x80 | opcode); // FIN + opcode
frame[1] = (byte) (payload.length); // ๋ง์คํฌ ์์, ๊ธธ์ด๋ง
System.arraycopy(payload, 0, frame, 2, payload.length);
return frame;
}
RFC 6455 ์ค์:
- โ Section 5.5: ์ ์ด ํ๋ ์ ํ์ด๋ก๋ ๊ธธ์ด ๊ฐ์ (โค 125๋ฐ์ดํธ)
- โ Section 5.5.1: ์ ํ์ ์ํ ์ฝ๋์ ์ด์ ๋ฅผ ํฌํจํ Close ํ๋ ์ (opcode 0x8)
- โ Section 5.5.2: Ping ํ๋ ์ (opcode 0x9)
- โ Section 5.5.3: Pong ํ๋ ์ (opcode 0xA)
WebSocket ์ธ์ ๊ด๋ฆฌโ
์ธ์ ๋ผ์ดํ์ฌ์ดํดโ
public class DefaultWebSocketSession implements WebSocketSession, WritableHandler {
private final String id;
private final SocketChannel channel;
private final Selector selector;
private final HttpRequest<?> handshakeRequest;
private final WebSocketEndpointInfo endpointInfo;
private final WebSocketFrameParser frameParser;
private final WebSocketFrameEncoder frameEncoder;
private volatile boolean open = true;
private volatile boolean isClosePending = false;
private final ByteBuffer readBuffer = ByteBuffer.allocate(65536);
private final Queue<ByteBuffer> pendingWrites = new ConcurrentLinkedQueue<>();
@Override
public void read(SelectionKey key) throws Exception {
// ์ฑ๋๋ก๋ถํฐ ๋
ผ๋ธ๋กํน ์ฝ๊ธฐ
int bytesRead = channel.read(readBuffer);
if (bytesRead == -1) {
callOnCloseMethod(CloseCodes.NO_STATUS_CODE);
close();
return;
}
readBuffer.flip();
// ๋ฒํผ๋ก๋ถํฐ ํ๋ ์ ํ์ฑ
while (readBuffer.remaining() > 0) {
readBuffer.mark();
InputStream frameInputStream = new ByteBufferInputStream(readBuffer);
try {
WebSocketFrame frame = frameParser.parse(frameInputStream);
processFrame(frame);
} catch (NotEnoughDataException e) {
// ๋ถ์์ ํ ํ๋ ์, ๋ ๋ง์ ๋ฐ์ดํฐ ๋๊ธฐ
readBuffer.reset();
break;
}
}
readBuffer.compact();
}
private void processFrame(WebSocketFrame frame) throws Exception {
if (WebSocketFrameDecoder.isCloseFrame(frame)) {
callOnCloseMethod(WebSocketFrameDecoder.getCloseCode(frame.getPayloadBytes()));
} else if (WebSocketFrameDecoder.isPingFrame(frame)) {
sendPong(frame.getPayloadBytes()); // RFC 6455 Section 5.5.2
} else if (WebSocketFrameDecoder.isPongFrame(frame)) {
// Pong ์์ (ํํธ๋นํธ ์๋ต)
} else if (WebSocketFrameDecoder.isDataFrame(frame)) {
dispatchMessage(frame);
}
}
}
RFC 6455 ์ค์:
- โ Section 5.5.2: Ping ํ๋ ์์ ๋ํ ์๋ Pong ์๋ต
- โ Section 5.5.1: ์ํ ์ฝ๋๋ฅผ ํฌํจํ ์ ์ ํ close ํ๋ ์ ์ฒ๋ฆฌ
- โ Section 7.1.4: ์ฐ๊ฒฐ ์ข ๋ฃ ์ํ์ค
๋ ผ๋ธ๋กํน ์ฐ๊ธฐ ์์ โ
@Override
public void sendText(String message) throws IOException {
scheduleWrite(ByteBuffer.wrap(frameEncoder.encodeText(message)));
}
private void scheduleWrite(ByteBuffer buf) {
pendingWrites.add(buf);
SelectionKey key = channel.keyFor(selector);
if (key != null && key.isValid() && (key.interestOps() & OP_WRITE) == 0) {
key.interestOps(key.interestOps() | OP_WRITE);
selector.wakeup();
}
}
@Override
public void write(SelectionKey key) throws Exception {
ByteBuffer buf;
while ((buf = pendingWrites.peek()) != null) {
channel.write(buf);
if (buf.hasRemaining()) return; // ๋ค์ ์ฐ๊ธฐ ์ด๋ฒคํธ ๋๊ธฐ
pendingWrites.poll();
}
if (pendingWrites.isEmpty()) {
key.interestOps(key.interestOps() & ~OP_WRITE);
// close๊ฐ ๋๊ธฐ ์ค์ด์๋ค๋ฉด ์ด์ ์ฑ๋ ์ข
๋ฃ
if (isClosePending && open) {
open = false;
channel.close();
}
}
}
Close ์ฝ๋ (RFC 6455 Section 7.4)โ
ํ์ค Close ์ฝ๋โ
public enum CloseCodes implements CloseCode {
NORMAL_CLOSURE(1000), // ์ ์ ์ข
๋ฃ
GOING_AWAY(1001), // ์๋ํฌ์ธํธ ์ข
๋ฃ
PROTOCOL_ERROR(1002), // ํ๋กํ ์ฝ ์ค๋ฅ
CANNOT_ACCEPT(1003), // ๋ฐ์ดํฐ ํ์
์ ๋ฐ์ ์ ์์
RESERVED(1004), // ์์ฝ๋จ
NO_STATUS_CODE(1005), // ์ํ ์ฝ๋ ์์
CLOSED_ABNORMALLY(1006), // ๋น์ ์ ์ข
๋ฃ
NOT_CONSISTENT(1007), // ์ ํจํ์ง ์์ UTF-8 ๋๋ ์๋ชป๋ ํ์
VIOLATED_POLICY(1008), // ์ ์ฑ
์๋ฐ
TOO_BIG(1009), // ๋ฉ์์ง๊ฐ ๋๋ฌด ํผ
NO_EXTENSION(1010), // ํ์ฅ์ด ํ์๋์ง ์์
UNEXPECTED_CONDITION(1011), // ์์์น ๋ชปํ ์๋ฒ ์ํ
SERVICE_RESTART(1012), // ์๋น์ค ์ฌ์์
TRY_AGAIN_LATER(1013), // ๋์ค์ ๋ค์ ์๋
TLS_HANDSHAKE_FAILURE(1015); // TLS ํธ๋์
ฐ์ดํฌ ์คํจ
public static CloseCode getCloseCode(int code) {
// RFC 6455 Section 7.4.2: ์ ํ๋ฆฌ์ผ์ด์
ํน์ ์ฝ๋ (3000-4999)
if (code >= 3000 && code < 5000) {
return new CloseCode() {
@Override
public int getCode() { return code; }
};
}
// ํ์ค close ์ฝ๋
return switch (code) {
case 1000 -> NORMAL_CLOSURE;
case 1001 -> GOING_AWAY;
// ... ๊ธฐํ ํ์ค ์ฝ๋
default -> throw new IllegalArgumentException("Invalid close code: " + code);
};
}
}
RFC 6455 ์ค์:
- โ Section 7.4.1: ๋ชจ๋ ํ์ค ์ํ ์ฝ๋ ๊ตฌํ (1000-1015)
- โ Section 7.4.2: ์ ํ๋ฆฌ์ผ์ด์ ํน์ ์ฝ๋ ์ง์ (3000-4999)
์ด๋ ธํ ์ด์ ๊ธฐ๋ฐ ์๋ํฌ์ธํธ ์ ์โ
WebSocket ํธ๋ค๋ฌโ
@WebSocketHandler("/ws/chat/{roomId}")
public class ChatWebSocketHandler {
@OnOpen
public void onOpen(@SocketSession WebSocketSession session,
@PathVariable("roomId") String roomId) {
System.out.println("New connection to room: " + roomId);
session.getUserProperties().put("roomId", roomId);
}
@MessageMapping("/message")
public void handleMessage(@Payload String message,
@SocketSession WebSocketSession session,
@PathVariable("roomId") String roomId) throws IOException {
// ๋ฐฉ์ ๋ชจ๋ ์ธ์
์ ๋ธ๋ก๋์บ์คํธ
String response = "[Room " + roomId + "] " + message;
session.sendText(response);
}
@OnClose
public void onClose(@SocketSession WebSocketSession session,
CloseCode closeCode) {
System.out.println("Connection closed: " + closeCode.getCode());
}
@OnError
public void onError(@SocketSession WebSocketSession session,
Throwable error) {
System.err.println("WebSocket error: " + error.getMessage());
}
}
์๋ํฌ์ธํธ ๋ฑ๋กโ
@Component
public class WebSocketEndpointRegistry {
private final Map<PathPattern, WebSocketEndpointInfo> endpointMappings =
new ConcurrentHashMap<>();
public WebSocketEndpointInfo getEndpointInfo(String path) {
for (Map.Entry<PathPattern, WebSocketEndpointInfo> entry :
endpointMappings.entrySet()) {
if (entry.getKey().matches(path)) {
return entry.getValue();
}
}
return null;
}
public void registerEndpoint(PathPattern pathPattern, Object handlerBean,
Method onOpenMethod, Method onCloseMethod,
Method onErrorMethod,
Map<String, Method> messageMappings) {
WebSocketEndpointInfo info = new WebSocketEndpointInfo(
pathPattern, handlerBean, onOpenMethod, onCloseMethod,
onErrorMethod, messageMappings
);
endpointMappings.put(pathPattern, info);
}
}
์ธ์ ์ปจํ ์ด๋โ
์ธ์ ๊ด๋ฆฌโ
@Component
public class DefaultWebSocketContainer implements WebSocketContainer, CloseListener {
// path -> (sessionId -> session)
private final Map<String, Map<String, WebSocketSession>> sessionStore =
new ConcurrentHashMap<>();
@Override
public void addSession(String path, WebSocketSession session) {
sessionStore.computeIfAbsent(path, k -> new ConcurrentHashMap<>())
.put(session.getId(), session);
}
@Override
public void removeSession(String path, String sessionId) {
Map<String, WebSocketSession> sessions = sessionStore.get(path);
if (sessions != null) {
sessions.remove(sessionId);
}
}
@Override
public Collection<WebSocketSession> getSessions(String path) {
return sessionStore.getOrDefault(path, Map.of()).values();
}
@Override
public void onSessionClosed(WebSocketSession session) {
this.removeSession(session.getRequestPath(), session.getId());
}
}