ByteBufferPool.java

package sprout.server;

import sprout.beans.InfrastructureBean;
import sprout.beans.annotation.Component;

import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;

@Component
public class ByteBufferPool implements InfrastructureBean {

    private static class PoolConfig {
        final int bufferSize;
        final int maxPoolSize;
        final ConcurrentLinkedQueue<ByteBuffer> pool;
        final AtomicLong acquireCount = new AtomicLong(0);
        final AtomicLong releaseCount = new AtomicLong(0);
        final AtomicLong allocateCount = new AtomicLong(0);

        PoolConfig(int bufferSize, int maxPoolSize) {
            this.bufferSize = bufferSize;
            this.maxPoolSize = maxPoolSize;
            this.pool = new ConcurrentLinkedQueue<>();
        }
    }

    // Predefined buffer sizes
    public static final int SMALL_BUFFER_SIZE = 2048;      // 2KB for protocol detection
    public static final int MEDIUM_BUFFER_SIZE = 8192;     // 8KB for read operations
    public static final int LARGE_BUFFER_SIZE = 32768;     // 32KB for large responses

    private static final int DEFAULT_MAX_POOL_SIZE = 500;

    private final ConcurrentHashMap<Integer, PoolConfig> pools;
    private final boolean useDirect;

    public ByteBufferPool() {
        this(false);
    }

    public ByteBufferPool(boolean useDirect) {
        this.useDirect = useDirect;
        this.pools = new ConcurrentHashMap<>();

        // Initialize default pools
        initializePool(SMALL_BUFFER_SIZE, DEFAULT_MAX_POOL_SIZE);
        initializePool(MEDIUM_BUFFER_SIZE, DEFAULT_MAX_POOL_SIZE);
        initializePool(LARGE_BUFFER_SIZE, DEFAULT_MAX_POOL_SIZE / 5); // Fewer large buffers
    }

    public void initializePool(int bufferSize, int maxPoolSize) {
        pools.put(bufferSize, new PoolConfig(bufferSize, maxPoolSize));
    }

    public ByteBuffer acquire(int size) {
        int poolSize = findPoolSize(size);
        PoolConfig config = pools.get(poolSize);

        if (config == null) {
            // No pool for this size, allocate directly
            return allocateBuffer(size);
        }

        config.acquireCount.incrementAndGet();

        ByteBuffer buffer = config.pool.poll();
        if (buffer != null) {
            // Got buffer from pool, reset it
            buffer.clear();
            return buffer;
        }

        // Pool is empty, allocate new buffer
        config.allocateCount.incrementAndGet();
        return allocateBuffer(poolSize);
    }

    public void release(ByteBuffer buffer) {
        if (buffer == null) {
            return;
        }

        int capacity = buffer.capacity();
        PoolConfig config = pools.get(capacity);

        if (config == null) {
            // Not a pooled size, let it be GC'd
            return;
        }

        config.releaseCount.incrementAndGet();

        // Check if pool is full
        if (config.pool.size() >= config.maxPoolSize) {
            // Pool is full, discard buffer (will be GC'd)
            return;
        }

        // Clear buffer and return to pool
        buffer.clear();
        config.pool.offer(buffer);
    }

    private int findPoolSize(int requestedSize) {
        if (requestedSize <= SMALL_BUFFER_SIZE) {
            return SMALL_BUFFER_SIZE;
        } else if (requestedSize <= MEDIUM_BUFFER_SIZE) {
            return MEDIUM_BUFFER_SIZE;
        } else if (requestedSize <= LARGE_BUFFER_SIZE) {
            return LARGE_BUFFER_SIZE;
        }
        // For very large buffers, return the requested size (no pooling)
        return requestedSize;
    }

    private ByteBuffer allocateBuffer(int size) {
        return useDirect ? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
    }

    public PoolStats getStats(int bufferSize) {
        PoolConfig config = pools.get(bufferSize);
        if (config == null) {
            return null;
        }

        return new PoolStats(
            bufferSize,
            config.pool.size(),
            config.maxPoolSize,
            config.acquireCount.get(),
            config.releaseCount.get(),
            config.allocateCount.get()
        );
    }

    public static class PoolStats {
        public final int bufferSize;
        public final int currentPoolSize;
        public final int maxPoolSize;
        public final long acquireCount;
        public final long releaseCount;
        public final long allocateCount;

        public PoolStats(int bufferSize, int currentPoolSize, int maxPoolSize,
                         long acquireCount, long releaseCount, long allocateCount) {
            this.bufferSize = bufferSize;
            this.currentPoolSize = currentPoolSize;
            this.maxPoolSize = maxPoolSize;
            this.acquireCount = acquireCount;
            this.releaseCount = releaseCount;
            this.allocateCount = allocateCount;
        }

        public double getHitRate() {
            if (acquireCount == 0) {
                return 0.0;
            }
            long hits = acquireCount - allocateCount;
            return (hits * 100.0) / acquireCount;
        }

        public double getUtilization() {
            if (maxPoolSize == 0) {
                return 0.0;
            }
            return (currentPoolSize * 100.0) / maxPoolSize;
        }

        @Override
        public String toString() {
            return String.format(
                "PoolStats{size=%d, pool=%d/%d, acquires=%d, releases=%d, allocations=%d, hitRate=%.2f%%, utilization=%.2f%%}",
                bufferSize, currentPoolSize, maxPoolSize, acquireCount, releaseCount,
                allocateCount, getHitRate(), getUtilization()
            );
        }
    }

    public void clear() {
        for (PoolConfig config : pools.values()) {
            config.pool.clear();
        }
    }

    public int getTotalBuffersInPool() {
        return pools.values().stream()
                .mapToInt(config -> config.pool.size())
                .sum();
    }
}