๋ณธ๋ฌธ์œผ๋กœ ๊ฑด๋„ˆ๋›ฐ๊ธฐ

๐Ÿ”Œ WebSocket

Sprout์˜ WebSocket ๊ตฌํ˜„์€ RFC 6455 ์‚ฌ์–‘์„ ๋”ฐ๋ผ ๋‹จ์ผ TCP ์—ฐ๊ฒฐ์„ ํ†ตํ•œ ์ „์ด์ค‘ ํ†ต์‹  ์ฑ„๋„์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค. NIO ๊ธฐ๋ฐ˜ ์„œ๋ฒ„ ์•„ํ‚คํ…์ฒ˜์™€ ์™„๋ฒฝํ•˜๊ฒŒ ํ†ตํ•ฉ๋˜์–ด ๊ณ ์„ฑ๋Šฅ์˜ ํ™•์žฅ ๊ฐ€๋Šฅํ•œ ์‹ค์‹œ๊ฐ„ ๋ฉ”์‹œ์ง•์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.

๊ฐœ์š”โ€‹

Sprout์˜ WebSocket ๊ตฌํ˜„์€ ๋‹ค์Œ์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค:

  • RFC 6455 ์ค€์ˆ˜: WebSocket ํ”„๋กœํ† ์ฝœ ์‚ฌ์–‘ ์™„์ „ ์ค€์ˆ˜
  • NIO ๊ธฐ๋ฐ˜ ์•„ํ‚คํ…์ฒ˜: ์ตœ๋Œ€ ํ™•์žฅ์„ฑ์„ ์œ„ํ•œ ๋…ผ๋ธ”๋กœํ‚น I/O
  • ์ŠคํŠธ๋ฆฌ๋ฐ ํ”„๋ ˆ์ž„ ์ฒ˜๋ฆฌ: ๋ฉ”๋ชจ๋ฆฌ ํšจ์œจ์ ์ธ ๋Œ€์šฉ๋Ÿ‰ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ
  • ์–ด๋…ธํ…Œ์ด์…˜ ๊ธฐ๋ฐ˜ ํ”„๋กœ๊ทธ๋ž˜๋ฐ: ์„ ์–ธ์  WebSocket ์—”๋“œํฌ์ธํŠธ ์ •์˜
  • ๋ผ์ดํ”„์‚ฌ์ดํด ๊ด€๋ฆฌ: OnOpen, OnClose, OnError, ๋ฉ”์‹œ์ง€ ํ•ธ๋“ค๋ง ์ฝœ๋ฐฑ
  • ๊ฒฝ๋กœ ๋ณ€์ˆ˜ ์ง€์›: /ws/{userId}์™€ ๊ฐ™์€ ๋™์  ๊ฒฝ๋กœ ํŒŒ๋ผ๋ฏธํ„ฐ
  • ์„ธ์…˜ ๊ด€๋ฆฌ: ์Šค๋ ˆ๋“œ ์•ˆ์ „ํ•œ WebSocket ์„ธ์…˜ ์ถ”์ 

WebSocket ์•„ํ‚คํ…์ฒ˜โ€‹

ํ•ต์‹ฌ ์ปดํฌ๋„ŒํŠธโ€‹

WebSocket ๊ตฌํ˜„์€ ๋‹ค์Œ์˜ ์ฃผ์š” ์ปดํฌ๋„ŒํŠธ๋กœ ๊ตฌ์„ฑ๋ฉ๋‹ˆ๋‹ค:

  • WebSocketProtocolHandler: ํ”„๋กœํ† ์ฝœ ๊ฐ์ง€ ๋ฐ ์—ฐ๊ฒฐ ์—…๊ทธ๋ ˆ์ด๋“œ
  • WebSocketHandshakeHandler: RFC 6455 ํ•ธ๋“œ์…ฐ์ดํฌ ํ˜‘์ƒ
  • WebSocketFrameParser: ๋งˆ์Šคํ‚น ์ง€์›์„ ํฌํ•จํ•œ ์ŠคํŠธ๋ฆฌ๋ฐ ํ”„๋ ˆ์ž„ ํŒŒ์„œ
  • WebSocketFrameEncoder: ์„œ๋ฒ„์—์„œ ํด๋ผ์ด์–ธํŠธ๋กœ์˜ ๋ฉ”์‹œ์ง€๋ฅผ ์œ„ํ•œ ํ”„๋ ˆ์ž„ ์ธ์ฝ”๋”ฉ
  • WebSocketSession: ์—ฐ๊ฒฐ ๋ผ์ดํ”„์‚ฌ์ดํด ๋ฐ ๋ฉ”์‹œ์ง€ ์ „์†ก
  • WebSocketContainer: ์„ธ์…˜ ๋ ˆ์ง€์ŠคํŠธ๋ฆฌ ๋ฐ ๊ด€๋ฆฌ
  • WebSocketEndpointRegistry: ์—”๋“œํฌ์ธํŠธ ๋งคํ•‘ ๋ฐ ํ•ธ๋“ค๋Ÿฌ ๋“ฑ๋ก

ํ”„๋กœํ† ์ฝœ ๊ฐ์ง€ ํ๋ฆ„โ€‹

// 1. NIO Selector๊ฐ€ ์ƒˆ๋กœ์šด ์—ฐ๊ฒฐ ๊ฐ์ง€
@Component
public class WebSocketProtocolDetector implements ProtocolDetector {
@Override
public String detect(ByteBuffer buffer) throws Exception {
buffer.mark();

// HTTP ์š”์ฒญ ๋ผ์ธ ์ฝ๊ธฐ
byte[] bytes = new byte[Math.min(buffer.remaining(), 512)];
buffer.get(bytes);
buffer.reset();

String content = new String(bytes, StandardCharsets.UTF_8);

// WebSocket ์—…๊ทธ๋ ˆ์ด๋“œ ํ—ค๋” ํ™•์ธ
if (content.contains("Upgrade: websocket") ||
content.contains("Upgrade: WebSocket")) {
return "WEBSOCKET";
}

return "UNKNOWN";
}
}

WebSocket ํ•ธ๋“œ์…ฐ์ดํฌ (RFC 6455 Section 4)โ€‹

ํ•ธ๋“œ์…ฐ์ดํฌ ํ”„๋กœ์„ธ์Šคโ€‹

ํ•ธ๋“œ์…ฐ์ดํฌ ๊ตฌํ˜„์€ RFC 6455๋ฅผ ์—„๊ฒฉํžˆ ๋”ฐ๋ฆ…๋‹ˆ๋‹ค:

@Component
public class DefaultWebSocketHandshakeHandler implements WebSocketHandshakeHandler {
private static final String WEBSOCKET_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";

@Override
public boolean performHandshake(HttpRequest<?> request, BufferedWriter out)
throws IOException {
// 1. ํ•„์ˆ˜ ํ—ค๋” ๊ฒ€์ฆ (RFC 6455 Section 4.2.1)
Map<String, String> headers = request.getHeaders();
String upgradeHeader = headers.get("Upgrade");
String connectionHeader = headers.get("Connection");
String secWebSocketKey = headers.get("Sec-WebSocket-Key");
String secWebSocketVersion = headers.get("Sec-WebSocket-Version");

// RFC 6455: ๋ฒ„์ „์€ ๋ฐ˜๋“œ์‹œ 13์ด์–ด์•ผ ํ•จ
if (!"websocket".equalsIgnoreCase(upgradeHeader) ||
!"Upgrade".equalsIgnoreCase(connectionHeader) ||
secWebSocketKey == null || secWebSocketKey.isBlank() ||
!"13".equals(secWebSocketVersion)) {
sendHandshakeErrorResponse(out, 400, "Bad Request",
"Invalid WebSocket handshake request headers.");
return false;
}

// 2. Sec-WebSocket-Accept ๊ฐ’ ๊ณ„์‚ฐ (RFC 6455 Section 4.2.2)
String secWebSocketAccept = generateSecWebSocketAccept(secWebSocketKey);

// 3. 101 Switching Protocols ์‘๋‹ต ์ „์†ก
out.write("HTTP/1.1 101 Switching Protocols\r\n");
out.write("Upgrade: websocket\r\n");
out.write("Connection: Upgrade\r\n");
out.write("Sec-WebSocket-Accept: " + secWebSocketAccept + "\r\n");
out.write("\r\n");
out.flush();

return true;
}

// RFC 6455 Section 4.2.2: Sec-WebSocket-Accept ๊ณ„์‚ฐ
private String generateSecWebSocketAccept(String secWebSocketKey)
throws NoSuchAlgorithmException {
String combined = secWebSocketKey + WEBSOCKET_GUID;
MessageDigest sha1 = MessageDigest.getInstance("SHA-1");
byte[] sha1Hash = sha1.digest(combined.getBytes(StandardCharsets.US_ASCII));
return Base64.getEncoder().encodeToString(sha1Hash);
}
}

RFC 6455 ์ค€์ˆ˜:

  • โœ… Section 4.1: ํด๋ผ์ด์–ธํŠธ ํ•ธ๋“œ์…ฐ์ดํฌ ์š”๊ตฌ์‚ฌํ•ญ ๊ฒ€์ฆ
  • โœ… Section 4.2: ์ ์ ˆํ•œ ํ—ค๋”๋ฅผ ํฌํ•จํ•œ ์„œ๋ฒ„ ํ•ธ๋“œ์…ฐ์ดํฌ ์‘๋‹ต
  • โœ… Section 4.2.2: SHA-1๊ณผ Base64๋ฅผ ์‚ฌ์šฉํ•œ Sec-WebSocket-Accept ๊ณ„์‚ฐ

ํ”„๋ ˆ์ž„ ๊ตฌ์กฐ (RFC 6455 Section 5)โ€‹

ํ”„๋ ˆ์ž„ ํ˜•์‹โ€‹

 0                   1                   2                   3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| | Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+

์ŠคํŠธ๋ฆฌ๋ฐ ํ”„๋ ˆ์ž„ ํŒŒ์„œโ€‹

ํŒŒ์„œ๋Š” ์ „์ฒด ํŽ˜์ด๋กœ๋“œ๋ฅผ ๋ฉ”๋ชจ๋ฆฌ์— ๋กœ๋“œํ•˜์ง€ ์•Š๊ธฐ ์œ„ํ•ด ์ŠคํŠธ๋ฆฌ๋ฐ์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค:

@Component
public class DefaultWebSocketFrameParser implements WebSocketFrameParser {
@Override
public WebSocketFrame parse(InputStream in) throws Exception {
// 1. ์ฒ˜์Œ ๋‘ ๋ฐ”์ดํŠธ ์ฝ๊ธฐ (RFC 6455 Section 5.2)
int b1 = in.read();
int b2 = in.read();

if (b1 == -1 || b2 == -1) {
throw new RuntimeException("Unexpected end of stream");
}

// 2. FIN๊ณผ opcode ํŒŒ์‹ฑ
boolean fin = (b1 & 0x80) != 0; // FIN ๋น„ํŠธ
int opcode = b1 & 0x0F; // Opcode (4๋น„ํŠธ)

// 3. MASK์™€ ํŽ˜์ด๋กœ๋“œ ๊ธธ์ด ํŒŒ์‹ฑ
boolean masked = (b2 & 0x80) != 0;
int payloadLen = b2 & 0x7F;

// 4. ํ™•์žฅ ํŽ˜์ด๋กœ๋“œ ๊ธธ์ด (RFC 6455 Section 5.2)
long actualPayloadLen;
if (payloadLen == 126) {
// 16๋น„ํŠธ ํ™•์žฅ ๊ธธ์ด
actualPayloadLen = ((in.read() & 0xFF) << 8) | (in.read() & 0xFF);
} else if (payloadLen == 127) {
// 64๋น„ํŠธ ํ™•์žฅ ๊ธธ์ด
actualPayloadLen = 0;
for (int i = 0; i < 8; i++) {
actualPayloadLen = (actualPayloadLen << 8) | (in.read() & 0xFF);
}
} else {
actualPayloadLen = payloadLen;
}

// 5. ๋งˆ์Šคํ‚น ํ‚ค๊ฐ€ ์žˆ์œผ๋ฉด ์ฝ๊ธฐ (RFC 6455 Section 5.3)
byte[] maskingKey = new byte[4];
if (masked) {
if (in.read(maskingKey) != 4) {
throw new IOException("Failed to read masking key");
}
}

// 6. ์ŠคํŠธ๋ฆฌ๋ฐ ํŽ˜์ด๋กœ๋“œ ์ž…๋ ฅ ์ƒ์„ฑ (์ „์ฒด ํŽ˜์ด๋กœ๋“œ ๋กœ๋“œ ๋ฐฉ์ง€)
InputStream payloadInputStream = new LimitedInputStream(in, actualPayloadLen);

if (masked) {
// ๋งˆ์Šคํ‚น ์•Œ๊ณ ๋ฆฌ์ฆ˜ ์ ์šฉ (RFC 6455 Section 5.3)
payloadInputStream = new MaskingInputStream(payloadInputStream, maskingKey);
}

return new WebSocketFrame(fin, opcode, payloadInputStream);
}
}

RFC 6455 ์ค€์ˆ˜:

  • โœ… Section 5.2: FIN, RSV, opcode, MASK, ํŽ˜์ด๋กœ๋“œ ๊ธธ์ด๋ฅผ ํฌํ•จํ•œ ์ ์ ˆํ•œ ๊ธฐ๋ณธ ํ”„๋ ˆ์ž„ ํŒŒ์‹ฑ
  • โœ… Section 5.2: 16๋น„ํŠธ(126) ๋ฐ 64๋น„ํŠธ(127) ๊ธธ์ด๋ฅผ ์œ„ํ•œ ํ™•์žฅ ํŽ˜์ด๋กœ๋“œ ๊ธธ์ด
  • โœ… Section 5.3: ํด๋ผ์ด์–ธํŠธ์—์„œ ์„œ๋ฒ„๋กœ์˜ ๋งˆ์Šคํ‚น ์š”๊ตฌ์‚ฌํ•ญ ๊ฐ•์ œ
  • โœ… Section 5.5: ์ œ์–ด ํ”„๋ ˆ์ž„ ํŽ˜์ด๋กœ๋“œ ๊ธธ์ด ์ œํ•œ (โ‰ค 125๋ฐ”์ดํŠธ)

๋งˆ์Šคํ‚น ์•Œ๊ณ ๋ฆฌ์ฆ˜ (RFC 6455 Section 5.3)โ€‹

public class MaskingInputStream extends FilterInputStream {
private final byte[] maskingKey;
private long bytesRead;

@Override
public int read() throws IOException {
int r = super.read();
if (r != -1) {
// RFC 6455 Section 5.3: transformed-octet-i = original-octet-i XOR masking-key-octet-j
// where j = i MOD 4
int k = maskingKey[(int) (bytesRead & 3)] & 0xFF;
r = (r ^ k) & 0xFF;
bytesRead++;
}
return r;
}
}

RFC 6455 ์ค€์ˆ˜:

  • โœ… Section 5.3: 4๋ฐ”์ดํŠธ ๋งˆ์Šคํ‚น ํ‚ค๋ฅผ ์‚ฌ์šฉํ•œ XOR ๋งˆ์Šคํ‚น ์•Œ๊ณ ๋ฆฌ์ฆ˜
  • โœ… Section 5.1: ํด๋ผ์ด์–ธํŠธ ํ”„๋ ˆ์ž„์€ ๋ฐ˜๋“œ์‹œ ๋งˆ์Šคํ‚น๋˜์–ด์•ผ ํ•˜๋ฉฐ; ์„œ๋ฒ„ ํ”„๋ ˆ์ž„์€ ๋งˆ์Šคํ‚น๋˜์–ด์„œ๋Š” ์•ˆ ๋จ

ํ”„๋ ˆ์ž„ ์ธ์ฝ”๋”ฉ (RFC 6455 Section 5)โ€‹

ํ…์ŠคํŠธ ํ”„๋ ˆ์ž„ ์ธ์ฝ”๋”ฉโ€‹

@Component
public class DefaultWebSocketFrameEncoder implements WebSocketFrameEncoder {
@Override
public byte[] encodeText(String message) {
byte[] payload = message.getBytes(StandardCharsets.UTF_8);
int payloadLen = payload.length;

ByteArrayOutputStream frameStream = new ByteArrayOutputStream();

// ์ฒซ ๋ฒˆ์งธ ๋ฐ”์ดํŠธ: FIN + opcode (ํ…์ŠคํŠธ = 0x1)
frameStream.write(0x81); // FIN=1, RSV=000, Opcode=0001

// ๋‘ ๋ฒˆ์งธ ๋ฐ”์ดํŠธ ๋ฐ ํ™•์žฅ ๊ธธ์ด
if (payloadLen < 126) {
frameStream.write((byte) payloadLen);
} else if (payloadLen <= 65535) {
frameStream.write(126);
frameStream.write((payloadLen >> 8) & 0xFF);
frameStream.write(payloadLen & 0xFF);
} else {
frameStream.write(127);
for (int i = 0; i < 8; i++) {
frameStream.write((byte) ((long)payloadLen >> (8 * (7 - i)) & 0xFF));
}
}

frameStream.write(payload);
return frameStream.toByteArray();
}
}

RFC 6455 ์ค€์ˆ˜:

  • โœ… Section 5.2: FIN, opcode, ํŽ˜์ด๋กœ๋“œ ๊ธธ์ด๋ฅผ ํฌํ•จํ•œ ์ ์ ˆํ•œ ํ”„๋ ˆ์ž„ ๊ตฌ์กฐ
  • โœ… Section 5.6: ํ…์ŠคํŠธ ํ”„๋ ˆ์ž„์€ UTF-8 ์ธ์ฝ”๋”ฉ๊ณผ ํ•จ๊ป˜ opcode 0x1 ์‚ฌ์šฉ
  • โœ… Section 5.7: ๋ฐ”์ด๋„ˆ๋ฆฌ ํ”„๋ ˆ์ž„์€ opcode 0x2 ์‚ฌ์šฉ

์ œ์–ด ํ”„๋ ˆ์ž„ ์ธ์ฝ”๋”ฉโ€‹

@Override
public byte[] encodeControlFrame(int opcode, byte[] payload) {
// RFC 6455 Section 5.5: ์ œ์–ด ํ”„๋ ˆ์ž„์€ ํŽ˜์ด๋กœ๋“œ๊ฐ€ 125๋ฐ”์ดํŠธ ์ดํ•˜์—ฌ์•ผ ํ•จ
if (payload.length > 125) {
throw new IllegalArgumentException(
"Control frame payload too big (must be <= 125)"
);
}

byte[] frame = new byte[2 + payload.length];
frame[0] = (byte) (0x80 | opcode); // FIN + opcode
frame[1] = (byte) (payload.length); // ๋งˆ์Šคํฌ ์—†์Œ, ๊ธธ์ด๋งŒ
System.arraycopy(payload, 0, frame, 2, payload.length);
return frame;
}

RFC 6455 ์ค€์ˆ˜:

  • โœ… Section 5.5: ์ œ์–ด ํ”„๋ ˆ์ž„ ํŽ˜์ด๋กœ๋“œ ๊ธธ์ด ๊ฐ•์ œ (โ‰ค 125๋ฐ”์ดํŠธ)
  • โœ… Section 5.5.1: ์„ ํƒ์  ์ƒํƒœ ์ฝ”๋“œ์™€ ์ด์œ ๋ฅผ ํฌํ•จํ•œ Close ํ”„๋ ˆ์ž„ (opcode 0x8)
  • โœ… Section 5.5.2: Ping ํ”„๋ ˆ์ž„ (opcode 0x9)
  • โœ… Section 5.5.3: Pong ํ”„๋ ˆ์ž„ (opcode 0xA)

WebSocket ์„ธ์…˜ ๊ด€๋ฆฌโ€‹

์„ธ์…˜ ๋ผ์ดํ”„์‚ฌ์ดํดโ€‹

public class DefaultWebSocketSession implements WebSocketSession, WritableHandler {
private final String id;
private final SocketChannel channel;
private final Selector selector;
private final HttpRequest<?> handshakeRequest;
private final WebSocketEndpointInfo endpointInfo;
private final WebSocketFrameParser frameParser;
private final WebSocketFrameEncoder frameEncoder;

private volatile boolean open = true;
private volatile boolean isClosePending = false;
private final ByteBuffer readBuffer = ByteBuffer.allocate(65536);
private final Queue<ByteBuffer> pendingWrites = new ConcurrentLinkedQueue<>();

@Override
public void read(SelectionKey key) throws Exception {
// ์ฑ„๋„๋กœ๋ถ€ํ„ฐ ๋…ผ๋ธ”๋กœํ‚น ์ฝ๊ธฐ
int bytesRead = channel.read(readBuffer);
if (bytesRead == -1) {
callOnCloseMethod(CloseCodes.NO_STATUS_CODE);
close();
return;
}

readBuffer.flip();

// ๋ฒ„ํผ๋กœ๋ถ€ํ„ฐ ํ”„๋ ˆ์ž„ ํŒŒ์‹ฑ
while (readBuffer.remaining() > 0) {
readBuffer.mark();

InputStream frameInputStream = new ByteBufferInputStream(readBuffer);

try {
WebSocketFrame frame = frameParser.parse(frameInputStream);
processFrame(frame);
} catch (NotEnoughDataException e) {
// ๋ถˆ์™„์ „ํ•œ ํ”„๋ ˆ์ž„, ๋” ๋งŽ์€ ๋ฐ์ดํ„ฐ ๋Œ€๊ธฐ
readBuffer.reset();
break;
}
}

readBuffer.compact();
}

private void processFrame(WebSocketFrame frame) throws Exception {
if (WebSocketFrameDecoder.isCloseFrame(frame)) {
callOnCloseMethod(WebSocketFrameDecoder.getCloseCode(frame.getPayloadBytes()));
} else if (WebSocketFrameDecoder.isPingFrame(frame)) {
sendPong(frame.getPayloadBytes()); // RFC 6455 Section 5.5.2
} else if (WebSocketFrameDecoder.isPongFrame(frame)) {
// Pong ์ˆ˜์‹  (ํ•˜ํŠธ๋น„ํŠธ ์‘๋‹ต)
} else if (WebSocketFrameDecoder.isDataFrame(frame)) {
dispatchMessage(frame);
}
}
}

RFC 6455 ์ค€์ˆ˜:

  • โœ… Section 5.5.2: Ping ํ”„๋ ˆ์ž„์— ๋Œ€ํ•œ ์ž๋™ Pong ์‘๋‹ต
  • โœ… Section 5.5.1: ์ƒํƒœ ์ฝ”๋“œ๋ฅผ ํฌํ•จํ•œ ์ ์ ˆํ•œ close ํ”„๋ ˆ์ž„ ์ฒ˜๋ฆฌ
  • โœ… Section 7.1.4: ์—ฐ๊ฒฐ ์ข…๋ฃŒ ์‹œํ€€์Šค

๋…ผ๋ธ”๋กœํ‚น ์“ฐ๊ธฐ ์ž‘์—…โ€‹

@Override
public void sendText(String message) throws IOException {
scheduleWrite(ByteBuffer.wrap(frameEncoder.encodeText(message)));
}

private void scheduleWrite(ByteBuffer buf) {
pendingWrites.add(buf);
SelectionKey key = channel.keyFor(selector);
if (key != null && key.isValid() && (key.interestOps() & OP_WRITE) == 0) {
key.interestOps(key.interestOps() | OP_WRITE);
selector.wakeup();
}
}

@Override
public void write(SelectionKey key) throws Exception {
ByteBuffer buf;
while ((buf = pendingWrites.peek()) != null) {
channel.write(buf);
if (buf.hasRemaining()) return; // ๋‹ค์Œ ์“ฐ๊ธฐ ์ด๋ฒคํŠธ ๋Œ€๊ธฐ
pendingWrites.poll();
}

if (pendingWrites.isEmpty()) {
key.interestOps(key.interestOps() & ~OP_WRITE);

// close๊ฐ€ ๋Œ€๊ธฐ ์ค‘์ด์—ˆ๋‹ค๋ฉด ์ด์ œ ์ฑ„๋„ ์ข…๋ฃŒ
if (isClosePending && open) {
open = false;
channel.close();
}
}
}

Close ์ฝ”๋“œ (RFC 6455 Section 7.4)โ€‹

ํ‘œ์ค€ Close ์ฝ”๋“œโ€‹

public enum CloseCodes implements CloseCode {
NORMAL_CLOSURE(1000), // ์ •์ƒ ์ข…๋ฃŒ
GOING_AWAY(1001), // ์—”๋“œํฌ์ธํŠธ ์ข…๋ฃŒ
PROTOCOL_ERROR(1002), // ํ”„๋กœํ† ์ฝœ ์˜ค๋ฅ˜
CANNOT_ACCEPT(1003), // ๋ฐ์ดํ„ฐ ํƒ€์ž…์„ ๋ฐ›์„ ์ˆ˜ ์—†์Œ
RESERVED(1004), // ์˜ˆ์•ฝ๋จ
NO_STATUS_CODE(1005), // ์ƒํƒœ ์ฝ”๋“œ ์—†์Œ
CLOSED_ABNORMALLY(1006), // ๋น„์ •์ƒ ์ข…๋ฃŒ
NOT_CONSISTENT(1007), // ์œ ํšจํ•˜์ง€ ์•Š์€ UTF-8 ๋˜๋Š” ์ž˜๋ชป๋œ ํƒ€์ž…
VIOLATED_POLICY(1008), // ์ •์ฑ… ์œ„๋ฐ˜
TOO_BIG(1009), // ๋ฉ”์‹œ์ง€๊ฐ€ ๋„ˆ๋ฌด ํผ
NO_EXTENSION(1010), // ํ™•์žฅ์ด ํ˜‘์ƒ๋˜์ง€ ์•Š์Œ
UNEXPECTED_CONDITION(1011), // ์˜ˆ์ƒ์น˜ ๋ชปํ•œ ์„œ๋ฒ„ ์ƒํƒœ
SERVICE_RESTART(1012), // ์„œ๋น„์Šค ์žฌ์‹œ์ž‘
TRY_AGAIN_LATER(1013), // ๋‚˜์ค‘์— ๋‹ค์‹œ ์‹œ๋„
TLS_HANDSHAKE_FAILURE(1015); // TLS ํ•ธ๋“œ์…ฐ์ดํฌ ์‹คํŒจ

public static CloseCode getCloseCode(int code) {
// RFC 6455 Section 7.4.2: ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ํŠน์ • ์ฝ”๋“œ (3000-4999)
if (code >= 3000 && code < 5000) {
return new CloseCode() {
@Override
public int getCode() { return code; }
};
}

// ํ‘œ์ค€ close ์ฝ”๋“œ
return switch (code) {
case 1000 -> NORMAL_CLOSURE;
case 1001 -> GOING_AWAY;
// ... ๊ธฐํƒ€ ํ‘œ์ค€ ์ฝ”๋“œ
default -> throw new IllegalArgumentException("Invalid close code: " + code);
};
}
}

RFC 6455 ์ค€์ˆ˜:

  • โœ… Section 7.4.1: ๋ชจ๋“  ํ‘œ์ค€ ์ƒํƒœ ์ฝ”๋“œ ๊ตฌํ˜„ (1000-1015)
  • โœ… Section 7.4.2: ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ํŠน์ • ์ฝ”๋“œ ์ง€์› (3000-4999)

์–ด๋…ธํ…Œ์ด์…˜ ๊ธฐ๋ฐ˜ ์—”๋“œํฌ์ธํŠธ ์ •์˜โ€‹

WebSocket ํ•ธ๋“ค๋Ÿฌโ€‹

@WebSocketHandler("/ws/chat/{roomId}")
public class ChatWebSocketHandler {

@OnOpen
public void onOpen(@SocketSession WebSocketSession session,
@PathVariable("roomId") String roomId) {
System.out.println("New connection to room: " + roomId);
session.getUserProperties().put("roomId", roomId);
}

@MessageMapping("/message")
public void handleMessage(@Payload String message,
@SocketSession WebSocketSession session,
@PathVariable("roomId") String roomId) throws IOException {
// ๋ฐฉ์˜ ๋ชจ๋“  ์„ธ์…˜์— ๋ธŒ๋กœ๋“œ์บ์ŠคํŠธ
String response = "[Room " + roomId + "] " + message;
session.sendText(response);
}

@OnClose
public void onClose(@SocketSession WebSocketSession session,
CloseCode closeCode) {
System.out.println("Connection closed: " + closeCode.getCode());
}

@OnError
public void onError(@SocketSession WebSocketSession session,
Throwable error) {
System.err.println("WebSocket error: " + error.getMessage());
}
}

์—”๋“œํฌ์ธํŠธ ๋“ฑ๋กโ€‹

@Component
public class WebSocketEndpointRegistry {
private final Map<PathPattern, WebSocketEndpointInfo> endpointMappings =
new ConcurrentHashMap<>();

public WebSocketEndpointInfo getEndpointInfo(String path) {
for (Map.Entry<PathPattern, WebSocketEndpointInfo> entry :
endpointMappings.entrySet()) {
if (entry.getKey().matches(path)) {
return entry.getValue();
}
}
return null;
}

public void registerEndpoint(PathPattern pathPattern, Object handlerBean,
Method onOpenMethod, Method onCloseMethod,
Method onErrorMethod,
Map<String, Method> messageMappings) {
WebSocketEndpointInfo info = new WebSocketEndpointInfo(
pathPattern, handlerBean, onOpenMethod, onCloseMethod,
onErrorMethod, messageMappings
);
endpointMappings.put(pathPattern, info);
}
}

์„ธ์…˜ ์ปจํ…Œ์ด๋„ˆโ€‹

์„ธ์…˜ ๊ด€๋ฆฌโ€‹

@Component
public class DefaultWebSocketContainer implements WebSocketContainer, CloseListener {
// path -> (sessionId -> session)
private final Map<String, Map<String, WebSocketSession>> sessionStore =
new ConcurrentHashMap<>();

@Override
public void addSession(String path, WebSocketSession session) {
sessionStore.computeIfAbsent(path, k -> new ConcurrentHashMap<>())
.put(session.getId(), session);
}

@Override
public void removeSession(String path, String sessionId) {
Map<String, WebSocketSession> sessions = sessionStore.get(path);
if (sessions != null) {
sessions.remove(sessionId);
}
}

@Override
public Collection<WebSocketSession> getSessions(String path) {
return sessionStore.getOrDefault(path, Map.of()).values();
}

@Override
public void onSessionClosed(WebSocketSession session) {
this.removeSession(session.getRequestPath(), session.getId());
}
}

๋ฉ”๋ชจ๋ฆฌ ํšจ์œจ์ ์ธ ์ŠคํŠธ๋ฆฌ๋ฐโ€‹

๋Œ€์šฉ๋Ÿ‰ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌโ€‹

์ŠคํŠธ๋ฆฌ๋ฐ ์•„ํ‚คํ…์ฒ˜๋Š” ์ „์ฒด ๋ฉ”์‹œ์ง€๋ฅผ ๋ฉ”๋ชจ๋ฆฌ์— ๋กœ๋“œํ•˜๋Š” ๊ฒƒ์„ ํ”ผํ•ฉ๋‹ˆ๋‹ค:

public class WebSocketFrame {
private final boolean fin;
private final int opcode;
private final InputStream payloadStream; // ์ŠคํŠธ๋ฆฌ๋ฐ ํŽ˜์ด๋กœ๋“œ

public byte[] getPayloadBytes() throws IOException {
// ๋ช…์‹œ์ ์œผ๋กœ ํ•„์š”ํ•  ๋•Œ๋งŒ ๋กœ๋“œ
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
byte[] data = new byte[4096];
int nRead;
while ((nRead = payloadStream.read(data, 0, data.length)) != -1) {
buffer.write(data, 0, nRead);
}
return buffer.toByteArray();
}
}

์ด์ :

  • ๐Ÿ“ฆ ๋Œ€์šฉ๋Ÿ‰ ํŒŒ์ผ ์ „์†ก: ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ๊ณ ๊ฐˆ์‹œํ‚ค์ง€ ์•Š๊ณ  GB ํฌ๊ธฐ์˜ ํŒŒ์ผ ์ŠคํŠธ๋ฆฌ๋ฐ
  • ๐Ÿš€ ๋‚ฎ์€ ์ง€์—ฐ์‹œ๊ฐ„: ์ „์ฒด ๋ฉ”์‹œ์ง€๊ฐ€ ๋„์ฐฉํ•˜๊ธฐ ์ „์— ์ฒ˜๋ฆฌ ์‹œ์ž‘
  • ๐Ÿ’พ ๋ฉ”๋ชจ๋ฆฌ ํšจ์œจ์„ฑ: ๋ฉ”์‹œ์ง€ ํฌ๊ธฐ์— ๊ด€๊ณ„์—†์ด ๊ณ ์ •๋œ ๋ฉ”๋ชจ๋ฆฌ ์‚ฌ์šฉ

RFC 6455 ์ค€์ˆ˜ ์š”์•ฝโ€‹

๊ตฌํ˜„๋œ ๊ธฐ๋Šฅโ€‹

RFC Section๊ธฐ๋Šฅ์ƒํƒœ
4.1ํด๋ผ์ด์–ธํŠธ ํ•ธ๋“œ์…ฐ์ดํฌโœ… ๊ฒ€์ฆ๋จ
4.2์„œ๋ฒ„ ํ•ธ๋“œ์…ฐ์ดํฌโœ… ๊ตฌํ˜„๋จ
4.2.2Sec-WebSocket-Acceptโœ… SHA-1 + Base64
5.1ํ”„๋ ˆ์ด๋ฐ ๊ฐœ์š”โœ… ์™„๋ฃŒ
5.2๊ธฐ๋ณธ ํ”„๋ ˆ์ด๋ฐโœ… ๋ชจ๋“  ํ•„๋“œ ํŒŒ์‹ฑ๋จ
5.3๋งˆ์Šคํ‚นโœ… XOR ์•Œ๊ณ ๋ฆฌ์ฆ˜
5.4๋‹จํŽธํ™”โœ… FIN ๋น„ํŠธ ์ง€์›
5.5์ œ์–ด ํ”„๋ ˆ์ž„โœ… Ping, Pong, Close
5.5.1Close ํ”„๋ ˆ์ž„โœ… ์ƒํƒœ ์ฝ”๋“œ ํฌํ•จ
5.5.2Ping/Pongโœ… ์ž๋™ ์‘๋‹ต
5.6๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„โœ… ํ…์ŠคํŠธ (UTF-8)
5.7๋ฐ”์ด๋„ˆ๋ฆฌ ํ”„๋ ˆ์ž„โœ… ์›์‹œ ๋ฐ”์ดํŠธ
7.4์ƒํƒœ ์ฝ”๋“œโœ… ๋ชจ๋“  ํ‘œ์ค€ ์ฝ”๋“œ

๊ฒ€์ฆ ํ™•์ธโ€‹

ํ•ธ๋“œ์…ฐ์ดํฌ ๊ฒ€์ฆ:

// RFC 6455 Section 4.2.1: ํ•„์ˆ˜ ์š”์ฒญ ํ—ค๋”
โœ… Upgrade: websocket
โœ… Connection: Upgrade
โœ… Sec-WebSocket-Key: base64-encoded-value
โœ… Sec-WebSocket-Version: 13

ํ”„๋ ˆ์ž„ ๊ฒ€์ฆ:

// RFC 6455 Section 5.1: ํด๋ผ์ด์–ธํŠธ์—์„œ ์„œ๋ฒ„๋กœ์˜ ๋งˆ์Šคํ‚น ์š”๊ตฌ์‚ฌํ•ญ
โœ… ํด๋ผ์ด์–ธํŠธ ํ”„๋ ˆ์ž„์€ ๋ฐ˜๋“œ์‹œ ๋งˆ์Šคํ‚น๋˜์–ด์•ผ ํ•จ (masked bit = 1)
โœ… ์„œ๋ฒ„ ํ”„๋ ˆ์ž„์€ ๋งˆ์Šคํ‚น๋˜์–ด์„œ๋Š” ์•ˆ ๋จ (masked bit = 0)

// RFC 6455 Section 5.5: ์ œ์–ด ํ”„๋ ˆ์ž„ ์ œ์•ฝ์‚ฌํ•ญ
โœ… ์ œ์–ด ํ”„๋ ˆ์ž„์€ ํŽ˜์ด๋กœ๋“œ๊ฐ€ 125๋ฐ”์ดํŠธ ์ดํ•˜์—ฌ์•ผ ํ•จ
โœ… ์ œ์–ด ํ”„๋ ˆ์ž„์€ ๋‹จํŽธํ™”๋˜์–ด์„œ๋Š” ์•ˆ ๋จ (FIN = 1)

Close ๊ฒ€์ฆ:

// RFC 6455 Section 7.1.4: Close ํ”„๋ ˆ์ž„ ์š”๊ตฌ์‚ฌํ•ญ
โœ… ์ฒซ 2๋ฐ”์ดํŠธ์— Close ์ƒํƒœ ์ฝ”๋“œ (Big-Endian)
โœ… ์ƒํƒœ ์ฝ”๋“œ ์ดํ›„ ์„ ํƒ์  UTF-8 ์ด์œ 
โœ… ์„œ๋ฒ„๊ฐ€ ์ƒํ˜ธ close ํ”„๋ ˆ์ž„ ์ „์†ก

์„ฑ๋Šฅ ํŠน์„ฑโ€‹

NIO ๊ธฐ๋ฐ˜ ํ™•์žฅ์„ฑโ€‹

๋‹จ์ผ ์ด๋ฒคํŠธ ๋ฃจํ”„ ์Šค๋ ˆ๋“œ:
โ”œโ”€โ”€ ๋ชจ๋“  ์—ฐ๊ฒฐ ์ˆ˜๋ฝ
โ”œโ”€โ”€ ๋ชจ๋“  ํ™œ์„ฑ WebSocket ์„ธ์…˜์œผ๋กœ๋ถ€ํ„ฐ ์ฝ๊ธฐ
โ”œโ”€โ”€ ๋Œ€๊ธฐ ์ค‘์ธ ๋ฐ์ดํ„ฐ๊ฐ€ ์žˆ๋Š” ์„ธ์…˜์— ์“ฐ๊ธฐ
โ””โ”€โ”€ ์ˆ˜์ฒœ ๊ฐœ์˜ ๋™์‹œ ์—ฐ๊ฒฐ ์ฒ˜๋ฆฌ

์›Œ์ปค ์Šค๋ ˆ๋“œ ํ’€ (๊ฐ€์ƒ ์Šค๋ ˆ๋“œ):
โ”œโ”€โ”€ ๋ฉ”์‹œ์ง€ ํŒŒ์‹ฑ
โ”œโ”€โ”€ ๋น„์ฆˆ๋‹ˆ์Šค ๋กœ์ง ์‹คํ–‰
โ””โ”€โ”€ ๋ฉ”์‹œ์ง€ ๋””์ŠคํŒจ์นญ

์„ฑ๋Šฅ ์ง€ํ‘œ:

  • ๋™์‹œ ์—ฐ๊ฒฐ: ์ธ์Šคํ„ด์Šค๋‹น 10,000๊ฐœ ์ด์ƒ
  • ์—ฐ๊ฒฐ๋‹น ๋ฉ”๋ชจ๋ฆฌ: ~8KB (๋ฒ„ํผ + ์„ธ์…˜ ์ƒํƒœ)
  • ๋ฉ”์‹œ์ง€ ์ง€์—ฐ์‹œ๊ฐ„: < 1ms (๋ฐ”์ด๋„ˆ๋ฆฌ์˜ ๊ฒฝ์šฐ ์ง๋ ฌํ™” ์˜ค๋ฒ„ํ—ค๋“œ ์—†์Œ)
  • ์ฒ˜๋ฆฌ๋Ÿ‰: ๊ตฌํ˜„์ด ์•„๋‹Œ ๋„คํŠธ์›Œํฌ ๋Œ€์—ญํญ์— ์˜ํ•ด ์ œํ•œ๋จ

์ŠคํŠธ๋ฆฌ๋ฐ ์ด์ โ€‹

// ์ „ํ†ต์ ์ธ ์ ‘๊ทผ: ์ „์ฒด ๋ฉ”์‹œ์ง€ ๋กœ๋“œ
byte[] payload = new byte[payloadLength]; // 1GB ํ• ๋‹น!
in.read(payload);

// Sprout ์ ‘๊ทผ: ์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ
InputStream payloadStream = new LimitedInputStream(in, payloadLength);
// ์ผ์ •ํ•œ ๋ฉ”๋ชจ๋ฆฌ๋กœ ์ ์ง„์  ์ฒ˜๋ฆฌ

๋ชจ๋ฒ” ์‚ฌ๋ก€โ€‹

1. ๋Œ€์šฉ๋Ÿ‰ ๋ฐ์ดํ„ฐ์—๋Š” ๋ฐ”์ด๋„ˆ๋ฆฌ ์‚ฌ์šฉโ€‹

@MessageMapping("/file")
public void handleFileUpload(@Payload InputStream stream,
@SocketSession WebSocketSession session)
throws IOException {
// ๋ฉ”๋ชจ๋ฆฌ์— ๋กœ๋“œํ•˜์ง€ ์•Š๊ณ  ๋””์Šคํฌ๋กœ ์ง์ ‘ ์ŠคํŠธ๋ฆฌ๋ฐ
try (FileOutputStream out = new FileOutputStream("/tmp/upload")) {
stream.transferTo(out);
}
}

2. ํ•˜ํŠธ๋น„ํŠธ ๊ตฌํ˜„โ€‹

@WebSocketHandler("/ws/monitor")
public class MonitoringHandler {
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);

@OnOpen
public void onOpen(@SocketSession WebSocketSession session) {
// 30์ดˆ๋งˆ๋‹ค ping ์ „์†ก
scheduler.scheduleAtFixedRate(() -> {
try {
session.sendPing(new byte[0]);
} catch (IOException e) {
// ์˜ค๋ฅ˜ ์ฒ˜๋ฆฌ
}
}, 30, 30, TimeUnit.SECONDS);
}
}

3. ๋ฐฑํ”„๋ ˆ์…” ์ฒ˜๋ฆฌโ€‹

@MessageMapping("/stream")
public void handleStream(@Payload InputStream stream,
@SocketSession WebSocketSession session)
throws IOException {
byte[] buffer = new byte[8192];
int bytesRead;

while ((bytesRead = stream.read(buffer)) != -1) {
// ์ฒญํฌ ์ฒ˜๋ฆฌ
processChunk(buffer, bytesRead);

// ํ•„์š”์‹œ ๋ฐฑํ”„๋ ˆ์…” ์ ์šฉ
if (pendingWrites.size() > 10) {
Thread.sleep(100); // ์†๋„ ์กฐ์ ˆ
}
}
}

4. ์šฐ์•„ํ•œ ์ข…๋ฃŒโ€‹

@OnClose
public void onClose(@SocketSession WebSocketSession session,
CloseCode closeCode) {
// ๋ฆฌ์†Œ์Šค ์ •๋ฆฌ
session.getUserProperties().clear();

// ์•„์ง ๋ณด๋‚ด์ง€ ์•Š์•˜๋‹ค๋ฉด close ํ”„๋ ˆ์ž„ ์ „์†ก
if (session.isOpen() && closeCode.getCode() == 1000) {
try {
session.close();
} catch (IOException e) {
// ์˜ค๋ฅ˜ ๋กœ๊ทธ
}
}
}

ํ™•์žฅ ํฌ์ธํŠธโ€‹

์ปค์Šคํ…€ ํ”„๋ ˆ์ž„ ํ•ธ๋“ค๋Ÿฌโ€‹

@Component
public class CustomFrameHandler implements FrameHandler {
@Override
public boolean canHandle(WebSocketFrame frame, FrameProcessingContext context) {
// ์ด ํ•ธ๋“ค๋Ÿฌ๊ฐ€ ํ”„๋ ˆ์ž„ ํƒ€์ž…์„ ์ง€์›ํ•˜๋Š”์ง€ ํ™•์ธ
return frame.getOpcode() == CUSTOM_OPCODE;
}

@Override
public void handle(WebSocketFrame frame, WebSocketSession session,
Map<String, String> pathVars) throws Exception {
// ์ปค์Šคํ…€ ํ”„๋ ˆ์ž„ ์ฒ˜๋ฆฌ ๋กœ์ง
}
}

์ปค์Šคํ…€ ๋ฉ”์‹œ์ง€ ๋””์ŠคํŒจ์ฒ˜โ€‹

@Component
public class ProtobufMessageDispatcher extends AbstractWebSocketMessageDispatcher {
@Override
protected ParsedMessage doParse(InputStream payloadStream) throws Exception {
// protobuf ๋ฉ”์‹œ์ง€ ํŒŒ์‹ฑ
MyProto.Message message = MyProto.Message.parseFrom(payloadStream);
return new ParsedMessage(message.getAction(), message);
}
}

Sprout์˜ WebSocket ๊ตฌํ˜„์€ ํ˜„๋Œ€์ ์ธ NIO ์•„ํ‚คํ…์ฒ˜, ๋ฉ”๋ชจ๋ฆฌ ํšจ์œจ์ ์ธ ์ŠคํŠธ๋ฆฌ๋ฐ, ์œ ์—ฐํ•œ ์–ด๋…ธํ…Œ์ด์…˜ ๊ธฐ๋ฐ˜ ํ”„๋กœ๊ทธ๋ž˜๋ฐ ๋ชจ๋ธ์„ ๊ฐ–์ถ˜ ํ”„๋กœ๋•์…˜ ์ค€๋น„๊ฐ€ ์™„๋ฃŒ๋œ RFC ์ค€์ˆ˜ WebSocket ์„œ๋ฒ„๋ฅผ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค. ์ด ๊ตฌํ˜„์€ ํ™•์žฅ์„ฑ, ํ‘œ์ค€ ์ค€์ˆ˜, ๊ฐœ๋ฐœ์ž ๊ฒฝํ—˜์„ ์šฐ์„ ์‹œํ•ฉ๋‹ˆ๋‹ค.