This is an automated email from the ASF dual-hosted git repository. gvvinblade pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 406144b IGNITE-13515 Performance drop when there are many thin clients per server. This closes #8307 406144b is described below commit 406144b33df3526a6055356333c158c8f89a38ce Author: Igor Seliverstov <gvvinbl...@gmail.com> AuthorDate: Tue Oct 6 15:05:14 2020 +0300 IGNITE-13515 Performance drop when there are many thin clients per server. This closes #8307 --- .../org/apache/ignite/IgniteSystemProperties.java | 10 +- .../ClientConnectorConfiguration.java | 38 ++- .../internal/binary/BinaryThreadLocalContext.java | 2 +- .../binary/streams/BinaryHeapOutputStream.java | 2 +- .../binary/streams/BinaryMemoryAllocator.java | 280 ++++++++++++++++++--- .../binary/streams/BinaryMemoryAllocatorChunk.java | 73 +----- .../internal/client/thin/TcpClientChannel.java | 137 +++++++--- .../odbc/ClientListenerMessageParser.java | 8 +- .../processors/odbc/ClientListenerNioListener.java | 14 +- ...er.java => ClientListenerNioMessageParser.java} | 68 +++-- .../odbc/ClientListenerNioServerBuffer.java | 113 --------- .../processors/odbc/ClientListenerProcessor.java | 25 +- .../internal/processors/odbc/ClientMessage.java | 189 ++++++++++++++ .../odbc/jdbc/JdbcConnectionContext.java | 4 +- .../processors/odbc/jdbc/JdbcMessageParser.java | 19 +- .../odbc/odbc/OdbcConnectionContext.java | 4 +- .../processors/odbc/odbc/OdbcMessageParser.java | 19 +- .../platform/client/ClientMessageParser.java | 18 +- .../internal/util/io/GridUnsafeDataInput.java | 2 +- .../internal/util/io/GridUnsafeDataOutput.java | 2 +- .../ignite/internal/util/nio/GridNioServer.java | 41 +-- .../internal/binary/BinaryMarshallerSelfTest.java | 14 +- 22 files changed, 718 insertions(+), 364 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 10197d0..7aa506b 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -53,7 +53,8 @@ import static org.apache.ignite.internal.IgniteKernal.DFLT_PERIODIC_STARVATION_C import static org.apache.ignite.internal.LongJVMPauseDetector.DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD; import static org.apache.ignite.internal.LongJVMPauseDetector.DFLT_JVM_PAUSE_DETECTOR_LAST_EVENTS_COUNT; import static org.apache.ignite.internal.LongJVMPauseDetector.DFLT_JVM_PAUSE_DETECTOR_PRECISION; -import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocatorChunk.DFLT_MARSHAL_BUFFERS_RECHECK; +import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.DFLT_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE; +import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.DFLT_MARSHAL_BUFFERS_RECHECK; import static org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.DFLT_DISCOVERY_HISTORY_SIZE; import static org.apache.ignite.internal.processors.affinity.AffinityAssignment.DFLT_AFFINITY_BACKUPS_THRESHOLD; import static org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache.DFLT_AFFINITY_HISTORY_SIZE; @@ -529,6 +530,13 @@ public final class IgniteSystemProperties { public static final String IGNITE_MARSHAL_BUFFERS_RECHECK = "IGNITE_MARSHAL_BUFFERS_RECHECK"; /** + * System property to specify per thread binary allocator chunk pool size. Default value is {@code 32}. + */ + @SystemProperty(value = "Per thread binary allocator chunk pool size.", + type = Integer.class, defaults = "" + DFLT_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE) + public static final String IGNITE_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE = "IGNITE_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE"; + + /** * System property to disable {@link HostnameVerifier} for SSL connections. * Can be used for development with self-signed certificates. Default value is {@code false}. */ diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConnectorConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConnectorConfiguration.java index a6cfcbd..b1f0c68 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConnectorConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConnectorConfiguration.java @@ -45,6 +45,9 @@ public class ClientConnectorConfiguration { /** Default size of thread pool. */ public static final int DFLT_THREAD_POOL_SIZE = IgniteConfiguration.DFLT_PUBLIC_THREAD_CNT; + /** Default selector count. */ + public static final int DFLT_SELECTOR_CNT = Math.max(4, Runtime.getRuntime().availableProcessors() / 2); + /** Default handshake timeout. */ public static final int DFLT_HANDSHAKE_TIMEOUT = 10_000; @@ -78,6 +81,9 @@ public class ClientConnectorConfiguration { /** Thread pool size. */ private int threadPoolSize = DFLT_THREAD_POOL_SIZE; + /** Selector count. */ + private int selectorCnt = DFLT_SELECTOR_CNT; + /** Idle timeout. */ private long idleTimeout = DFLT_IDLE_TIMEOUT; @@ -297,21 +303,21 @@ public class ClientConnectorConfiguration { } /** - * Size of thread pool that is in charge of processing SQL requests. + * Size of thread pool that is in charge of processing client requests. * <p> * Defaults {@link #DFLT_THREAD_POOL_SIZE}. * - * @return Thread pool that is in charge of processing SQL requests. + * @return Thread pool that is in charge of processing client requests. */ public int getThreadPoolSize() { return threadPoolSize; } /** - * Sets thread pool that is in charge of processing SQL requests. See {@link #getThreadPoolSize()} for more + * Sets thread pool that is in charge of processing client requests. See {@link #getThreadPoolSize()} for more * information. * - * @param threadPoolSize Thread pool that is in charge of processing SQL requests. + * @param threadPoolSize Thread pool that is in charge of processing client requests. * @return This instance for chaining. */ public ClientConnectorConfiguration setThreadPoolSize(int threadPoolSize) { @@ -321,6 +327,30 @@ public class ClientConnectorConfiguration { } /** + * Get count of selectors to use in TCP server. + * <p> + * Defaults {@link #DFLT_SELECTOR_CNT}. + * + * @return Count of selectors to use in TCP server. + */ + public int getSelectorCount() { + return selectorCnt; + } + + /** + * Sets count of selectors to use in TCP server. See {@link #getSelectorCount()} for more + * information. + * + * @param selectorCnt Count of selectors to use in TCP server. + * @return This instance for chaining. + */ + public ClientConnectorConfiguration setSelectorCount(int selectorCnt) { + this.selectorCnt = selectorCnt; + + return this; + } + + /** * Gets idle timeout for client connections. * If no packets come within idle timeout, the connection is closed. * Zero or negative means no timeout. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryThreadLocalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryThreadLocalContext.java index f8da90b..1b68207 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryThreadLocalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryThreadLocalContext.java @@ -32,7 +32,7 @@ public class BinaryThreadLocalContext { }; /** Memory chunk. */ - private final BinaryMemoryAllocatorChunk chunk = BinaryMemoryAllocator.INSTANCE.chunk(); + private final BinaryMemoryAllocatorChunk chunk = BinaryMemoryAllocator.THREAD_LOCAL.chunk(); /** Schema holder. */ private final BinaryWriterSchemaHolder schema = new BinaryWriterSchemaHolder(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java index 17bcdf6..0f411b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java @@ -37,7 +37,7 @@ public final class BinaryHeapOutputStream extends BinaryAbstractOutputStream { * @param cap Initial capacity. */ public BinaryHeapOutputStream(int cap) { - this(cap, BinaryMemoryAllocator.INSTANCE.chunk()); + this(cap, BinaryMemoryAllocator.THREAD_LOCAL.chunk()); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocator.java index 5471bc5..8993fb3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocator.java @@ -17,41 +17,265 @@ package org.apache.ignite.internal.binary.streams; +import java.util.ArrayDeque; +import java.util.Arrays; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.internal.util.typedef.internal.U; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK; + /** - * Thread-local memory allocator. + * On-heap memory allocator. */ -public final class BinaryMemoryAllocator { - /** Memory allocator instance. */ - public static final BinaryMemoryAllocator INSTANCE = new BinaryMemoryAllocator(); - - /** Holders. */ - private static final ThreadLocal<BinaryMemoryAllocatorChunk> holders = new ThreadLocal<>(); - - /** - * Ensures singleton. - */ - private BinaryMemoryAllocator() { - // No-op. - } +public abstract class BinaryMemoryAllocator { + /** @see IgniteSystemProperties#IGNITE_MARSHAL_BUFFERS_RECHECK */ + public static final int DFLT_MARSHAL_BUFFERS_RECHECK = 10000; + + /** @see IgniteSystemProperties#IGNITE_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE */ + public static final int DFLT_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE = 32; + + /** Buffer size re-check frequency. */ + private static final Long CHECK_FREQ = Long.getLong(IGNITE_MARSHAL_BUFFERS_RECHECK, DFLT_MARSHAL_BUFFERS_RECHECK); + + /** */ + private static final int POOL_SIZE = Integer.getInteger(IGNITE_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE, DFLT_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE); + + /** Thread local allocator instance. */ + public static final BinaryMemoryAllocator THREAD_LOCAL = new ThreadLocalAllocator(); + + /** Pooled allocator instance. */ + public static final BinaryMemoryAllocator POOLED = new PooledAllocator(); + + /** */ + public abstract BinaryMemoryAllocatorChunk chunk(); + + /** */ + public abstract boolean isAcquired(); + + /** */ + private static class ThreadLocalAllocator extends BinaryMemoryAllocator { + /** Holders. */ + private final ThreadLocal<BinaryMemoryAllocatorChunk> holders = new ThreadLocal<>(); + + /** {@inheritDoc} */ + @Override public BinaryMemoryAllocatorChunk chunk() { + BinaryMemoryAllocatorChunk holder = holders.get(); + + if (holder == null) + holders.set(holder = new Chunk()); + + return holder; + } + + /** + * Checks whether a thread-local array is acquired or not. + * The function is used by Unit tests. + * + * @return {@code true} if acquired {@code false} otherwise. + */ + @Override public boolean isAcquired() { + BinaryMemoryAllocatorChunk holder = holders.get(); + + return holder != null && holder.isAcquired(); + } + + /** + * Memory allocator chunk. + */ + private static class Chunk implements BinaryMemoryAllocatorChunk { + /** Data array */ + private byte[] data; + + /** Max message size detected between checks. */ + private int maxMsgSize; + + /** Last time array size is checked. */ + private long lastCheckNanos = System.nanoTime(); + + /** Whether the holder is acquired or not. */ + private boolean acquired; + + /** {@inheritDoc} */ + @Override public byte[] allocate(int size) { + if (acquired) + return new byte[size]; + + acquired = true; + + if (data == null || size > data.length) + data = new byte[size]; + + return data; + } + + /** {@inheritDoc} */ + @Override public byte[] reallocate(byte[] data, int size) { + byte[] newData = new byte[size]; + + if (this.data == data) + this.data = newData; + + System.arraycopy(data, 0, newData, 0, data.length); + + return newData; + } + + /** {@inheritDoc} */ + @Override public void release(byte[] data, int maxMsgSize) { + if (this.data != data) + return; - public BinaryMemoryAllocatorChunk chunk() { - BinaryMemoryAllocatorChunk holder = holders.get(); + if (maxMsgSize > this.maxMsgSize) + this.maxMsgSize = maxMsgSize; - if (holder == null) - holders.set(holder = new BinaryMemoryAllocatorChunk()); + acquired = false; - return holder; + long nowNanos = System.nanoTime(); + + if (U.nanosToMillis(nowNanos - lastCheckNanos) >= CHECK_FREQ) { + int halfSize = data.length >> 1; + + if (this.maxMsgSize < halfSize) + this.data = new byte[halfSize]; + + lastCheckNanos = nowNanos; + } + } + + /** {@inheritDoc} */ + @Override public boolean isAcquired() { + return acquired; + } + } } - /** - * Checks whether a thread-local array is acquired or not. - * The function is used by Unit tests. - * - * @return {@code true} if acquired {@code false} otherwise. - */ - public boolean isAcquired() { - BinaryMemoryAllocatorChunk holder = holders.get(); + /** */ + private static class PooledAllocator extends BinaryMemoryAllocator { + /** */ + private final ThreadLocal<DataHoldersPool> holders = ThreadLocal.withInitial(DataHoldersPool::new); + + /** {@inheritDoc} */ + @Override public BinaryMemoryAllocatorChunk chunk() { + return new Chunk(holders.get()); + } + + /** {@inheritDoc} */ + @Override public boolean isAcquired() { + return false; + } + + /** */ + private static class DataHoldersPool { + /** */ + private final ArrayDeque<DataHolder> pool = new ArrayDeque<>(POOL_SIZE); + + /** */ + public synchronized DataHolder acquire() { + return pool.isEmpty() ? new DataHolder() : pool.pop(); + } + + /** */ + public synchronized void release(DataHolder holder) { + if (pool.size() < POOL_SIZE) pool.push(holder); + } + } + + /** */ + private static class DataHolder { + /** Size history. */ + private final int[] history = new int[128]; + + /** Size history cntr. */ + private int cntr; + + /** Last time array size is checked. */ + private long lastCheckNanos = System.nanoTime(); + + /** Data array */ + private byte[] data; + + /** */ + public byte[] ensureCapacity(int size, boolean copy) { + if (data == null) + data = new byte[size]; + else if (data.length < size) + data = copy ? Arrays.copyOf(data, size) : new byte[size]; + + return data; + } + + /** */ + public boolean corresponds(byte[] data) { + return data != null && this.data == data; + } + + /** */ + public void adjustSize(int msgSize) { + history[cntr % history.length] = msgSize; + cntr = cntr == Integer.MAX_VALUE ? 0 : cntr + 1; + + long now = System.nanoTime(); + if (U.nanosToMillis(now - lastCheckNanos) >= CHECK_FREQ && cntr > history.length) { + lastCheckNanos = now; + + int[] tmp = Arrays.copyOf(history, history.length); + Arrays.sort(tmp); + int adjusted = U.nextPowerOf2(tmp[tmp.length / 2]); + + if (adjusted < data.length) + data = new byte[adjusted]; + } + } + } + + /** */ + private static class Chunk implements BinaryMemoryAllocatorChunk { + /** */ + private volatile DataHolder holder; + + /** */ + private final DataHoldersPool pool; + + /** */ + private Chunk(DataHoldersPool pool) { + this.pool = pool; + } + + /** {@inheritDoc} */ + @Override public byte[] allocate(int size) { + if (holder != null) + return new byte[size]; + + holder = pool.acquire(); + return holder.ensureCapacity(size, false); + } + + /** {@inheritDoc} */ + @Override public byte[] reallocate(byte[] data, int size) { + DataHolder holder0 = holder; + if (holder0 != null && holder0.corresponds(data)) + return holder0.ensureCapacity(size, true); + else + return Arrays.copyOf(data, size); + } + + /** {@inheritDoc} */ + @Override public void release(byte[] data, int msgSize) { + DataHolder holder0 = holder; + if (holder0 == null || !holder0.corresponds(data)) + return; + + holder.adjustSize(msgSize); + + pool.release(holder); + holder = null; + } - return holder != null && holder.isAcquired(); + /** {@inheritDoc} */ + @Override public boolean isAcquired() { + return holder != null; + } + } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocatorChunk.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocatorChunk.java index 27570cf..5f2dfee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocatorChunk.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocatorChunk.java @@ -17,50 +17,17 @@ package org.apache.ignite.internal.binary.streams; -import org.apache.ignite.IgniteSystemProperties; -import org.apache.ignite.internal.util.typedef.internal.U; - -import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK; - /** * Memory allocator chunk. */ -public class BinaryMemoryAllocatorChunk { - /** @see IgniteSystemProperties#IGNITE_MARSHAL_BUFFERS_RECHECK */ - public static final int DFLT_MARSHAL_BUFFERS_RECHECK = 10000; - - /** Buffer size re-check frequency. */ - private static final Long CHECK_FREQ = Long.getLong(IGNITE_MARSHAL_BUFFERS_RECHECK, DFLT_MARSHAL_BUFFERS_RECHECK); - - /** Data array */ - private byte[] data; - - /** Max message size detected between checks. */ - private int maxMsgSize; - - /** Last time array size is checked. */ - private long lastCheckNanos = System.nanoTime(); - - /** Whether the holder is acquired or not. */ - private boolean acquired; - +public interface BinaryMemoryAllocatorChunk { /** * Allocate. * * @param size Desired size. * @return Data. */ - public byte[] allocate(int size) { - if (acquired) - return new byte[size]; - - acquired = true; - - if (data == null || size > data.length) - data = new byte[size]; - - return data; - } + public byte[] allocate(int size); /** * Reallocate. @@ -69,45 +36,15 @@ public class BinaryMemoryAllocatorChunk { * @param size Size. * @return New data. */ - public byte[] reallocate(byte[] data, int size) { - byte[] newData = new byte[size]; - - if (this.data == data) - this.data = newData; - - System.arraycopy(data, 0, newData, 0, data.length); - - return newData; - } + public byte[] reallocate(byte[] data, int size); /** * Shrinks array size if needed. */ - public void release(byte[] data, int maxMsgSize) { - if (this.data != data) - return; - - if (maxMsgSize > this.maxMsgSize) - this.maxMsgSize = maxMsgSize; - - this.acquired = false; - - long nowNanos = System.nanoTime(); - - if (U.nanosToMillis(nowNanos - lastCheckNanos) >= CHECK_FREQ) { - int halfSize = data.length >> 1; - - if (this.maxMsgSize < halfSize) - this.data = new byte[halfSize]; - - lastCheckNanos = nowNanos; - } - } + public void release(byte[] data, int maxMsgSize); /** * @return {@code True} if acquired. */ - public boolean isAcquired() { - return acquired; - } + public boolean isAcquired(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java index acc0ffe..28330e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java @@ -410,14 +410,15 @@ class TcpClientChannel implements ClientChannel { * Process next message from the input stream and complete corresponding future. */ private void processNextMessage() throws ClientProtocolError, ClientConnectionException { - int msgSize = dataInput.readInt(); + // blocking read a message header not to fall into a busy loop + int msgSize = dataInput.readInt(2048); if (msgSize <= 0) throw new ClientProtocolError(String.format("Invalid message size: %s", msgSize)); long bytesReadOnStartMsg = dataInput.totalBytesRead(); - long resId = dataInput.readLong(); + long resId = dataInput.spinReadLong(); int status = 0; @@ -426,11 +427,11 @@ class TcpClientChannel implements ClientChannel { BinaryInputStream resIn; if (protocolCtx.isFeatureSupported(PARTITION_AWARENESS)) { - short flags = dataInput.readShort(); + short flags = dataInput.spinReadShort(); if ((flags & ClientFlag.AFFINITY_TOPOLOGY_CHANGED) != 0) { - long topVer = dataInput.readLong(); - int minorTopVer = dataInput.readInt(); + long topVer = dataInput.spinReadLong(); + int minorTopVer = dataInput.spinReadInt(); srvTopVer = new AffinityTopologyVersion(topVer, minorTopVer); @@ -439,7 +440,7 @@ class TcpClientChannel implements ClientChannel { } if ((flags & ClientFlag.NOTIFICATION) != 0) { - short notificationCode = dataInput.readShort(); + short notificationCode = dataInput.spinReadShort(); notificationOp = ClientOperation.fromCode(notificationCode); @@ -448,10 +449,10 @@ class TcpClientChannel implements ClientChannel { } if ((flags & ClientFlag.ERROR) != 0) - status = dataInput.readInt(); + status = dataInput.spinReadInt(); } else - status = dataInput.readInt(); + status = dataInput.spinReadInt(); int hdrSize = (int)(dataInput.totalBytesRead() - bytesReadOnStartMsg); @@ -460,12 +461,12 @@ class TcpClientChannel implements ClientChannel { if (status == 0) { if (msgSize > hdrSize) - res = dataInput.read(msgSize - hdrSize); + res = dataInput.spinRead(msgSize - hdrSize); } else if (status == ClientStatus.SECURITY_VIOLATION) err = new ClientAuthorizationException(); else { - resIn = new BinaryHeapInputStream(dataInput.read(msgSize - hdrSize)); + resIn = new BinaryHeapInputStream(dataInput.spinRead(msgSize - hdrSize)); String errMsg = ClientUtils.createBinaryReader(null, resIn).readString(); @@ -713,47 +714,81 @@ class TcpClientChannel implements ClientChannel { this.in = in; } + /** Read bytes from the input stream. */ + public byte[] read(int len) throws ClientConnectionException { + byte[] bytes = new byte[len]; + + read(bytes, len, 0); + + return bytes; + } + + /** Read bytes from the input stream. */ + public byte[] spinRead(int len) { + byte[] bytes = new byte[len]; + + read(bytes, len, Integer.MAX_VALUE); + + return bytes; + } + /** * Read bytes from the input stream to the buffer. * * @param bytes Bytes buffer. * @param len Length. + * @param tryReadCnt Number of reads before falling into blocking read. */ - private void read(byte[] bytes, int len) throws ClientConnectionException { - int bytesNum; - int readBytesNum = 0; - - while (readBytesNum < len) { - try { - bytesNum = in.read(bytes, readBytesNum, len - readBytesNum); - } - catch (IOException e) { - throw handleIOError(e); - } + public void read(byte[] bytes, int len, int tryReadCnt) throws ClientConnectionException { + int offset = 0; - if (bytesNum < 0) - throw handleIOError(null); + try { + while (offset < len) { + int toRead; - readBytesNum += bytesNum; - } + if (tryReadCnt == 0) + toRead = len - offset; + else if ((toRead = Math.min(in.available(), len - offset)) == 0) { + tryReadCnt--; - totalBytesRead += readBytesNum; - } + continue; + } - /** Read bytes from the input stream. */ - public byte[] read(int len) throws ClientConnectionException { - byte[] bytes = new byte[len]; + int read = in.read(bytes, offset, toRead); - read(bytes, len); + if (read < 0) + throw handleIOError(null); - return bytes; + offset += read; + totalBytesRead += read; + } + } + catch (IOException e) { + throw handleIOError(e); + } } /** * Read long value from the input stream. */ public long readLong() throws ClientConnectionException { - read(tmpBuf, Long.BYTES); + return readLong(0); + } + + /** + * Read long value from the input stream. + */ + public long spinReadLong() throws ClientConnectionException { + return readLong(Integer.MAX_VALUE); + } + + /** + * Read long value from the input stream. + * + * @param tryReadCnt Number of reads before falling into blocking read. + */ + private long readLong(int tryReadCnt) throws ClientConnectionException { + read(tmpBuf, Long.BYTES, tryReadCnt); return BinaryPrimitives.readLong(tmpBuf, 0); } @@ -762,7 +797,23 @@ class TcpClientChannel implements ClientChannel { * Read int value from the input stream. */ public int readInt() throws ClientConnectionException { - read(tmpBuf, Integer.BYTES); + return readInt(0); + } + + /** + * Read int value from the input stream. + */ + public int spinReadInt() throws ClientConnectionException { + return readInt(Integer.MAX_VALUE); + } + + /** + * Read int value from the input stream. + * + * @param tryReadCnt Number of reads before falling into blocking read. + */ + private int readInt(int tryReadCnt) throws ClientConnectionException { + read(tmpBuf, Integer.BYTES, tryReadCnt); return BinaryPrimitives.readInt(tmpBuf, 0); } @@ -771,7 +822,23 @@ class TcpClientChannel implements ClientChannel { * Read short value from the input stream. */ public short readShort() throws ClientConnectionException { - read(tmpBuf, Short.BYTES); + return readShort(0); + } + + /** + * Read short value from the input stream. + */ + public short spinReadShort() throws ClientConnectionException { + return readShort(Integer.MAX_VALUE); + } + + /** + * Read short value from the input stream. + * + * @param tryReadCnt Number of reads before falling into blocking read. + */ + public short readShort(int tryReadCnt) throws ClientConnectionException { + read(tmpBuf, Short.BYTES, tryReadCnt); return BinaryPrimitives.readShort(tmpBuf, 0); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerMessageParser.java index c3782ef..4dfc117 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerMessageParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerMessageParser.java @@ -27,7 +27,7 @@ public interface ClientListenerMessageParser { * @param msg Message. * @return Request. */ - ClientListenerRequest decode(byte[] msg); + ClientListenerRequest decode(ClientMessage msg); /** * Encode response to byte array. @@ -35,7 +35,7 @@ public interface ClientListenerMessageParser { * @param resp Response. * @return Message. */ - byte[] encode(ClientListenerResponse resp); + ClientMessage encode(ClientListenerResponse resp); /** * Decode command type. Allows to recognize the command (message type) without decoding the entire message. @@ -43,7 +43,7 @@ public interface ClientListenerMessageParser { * @param msg Message. * @return Command type. */ - int decodeCommandType(byte[] msg); + int decodeCommandType(ClientMessage msg); /** * Decode request Id. Allows to recognize the request Id, if any, without decoding the entire message. @@ -51,5 +51,5 @@ public interface ClientListenerMessageParser { * @param msg Message. * @return Request Id. */ - long decodeRequestId(byte[] msg); + long decodeRequestId(ClientMessage msg); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java index 369eb2d..393383d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java @@ -53,7 +53,7 @@ import org.jetbrains.annotations.Nullable; /** * Client message listener. */ -public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte[]> { +public class ClientListenerNioListener extends GridNioServerListenerAdapter<ClientMessage> { /** ODBC driver handshake code. */ public static final byte ODBC_CLIENT = 0; @@ -139,7 +139,7 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte } /** {@inheritDoc} */ - @Override public void onMessage(GridNioSession ses, byte[] msg) { + @Override public void onMessage(GridNioSession ses, ClientMessage msg) { assert msg != null; ClientListenerConnectionContext connCtx = ses.meta(CONN_CTX_META_KEY); @@ -214,9 +214,7 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte ", resp=" + resp.status() + ']'); } - byte[] outMsg = parser.encode(resp); - - GridNioFuture<?> fut = ses.send(outMsg); + GridNioFuture<?> fut = ses.send(parser.encode(resp)); fut.listen(f -> { if (f.error() == null) @@ -289,7 +287,7 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte * @param ses Session. * @param msg Message bytes. */ - private void onHandshake(GridNioSession ses, byte[] msg) { + private void onHandshake(GridNioSession ses, ClientMessage msg) { BinaryContext ctx = new BinaryContext(BinaryCachingMetadataHandler.create(), new IgniteConfiguration(), null); BinaryMarshaller marsh = new BinaryMarshaller(); @@ -298,7 +296,7 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte ctx.configure(marsh); - BinaryReaderExImpl reader = new BinaryReaderExImpl(ctx, new BinaryHeapInputStream(msg), null, true); + BinaryReaderExImpl reader = new BinaryReaderExImpl(ctx, new BinaryHeapInputStream(msg.payload()), null, true); byte cmd = reader.readByte(); @@ -373,7 +371,7 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte writer.writeInt(ClientStatus.FAILED); } - ses.send(writer.array()); + ses.send(new ClientMessage(writer.array())); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerBufferedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioMessageParser.java similarity index 56% rename from modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerBufferedParser.java rename to modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioMessageParser.java index 48c7367..9c01770 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerBufferedParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioMessageParser.java @@ -19,14 +19,16 @@ package org.apache.ignite.internal.processors.odbc; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.ByteOrder; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.util.nio.GridNioParser; import org.apache.ignite.internal.util.nio.GridNioSession; import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; /** - * This class implements stream parser based on {@link ClientListenerNioServerBuffer}. + * This class implements stream parser. * <p> * The rule for this parser is that every message sent over the stream is prepended with * 4-byte integer header containing message size. So, the stream structure is as follows: @@ -36,45 +38,57 @@ import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey; * +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+ * </pre> */ -public class ClientListenerBufferedParser implements GridNioParser { - /** Buffer metadata key. */ - private static final int BUF_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); +public class ClientListenerNioMessageParser implements GridNioParser { + /** Message metadata key. */ + static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); - /** {@inheritDoc} */ - @Override public byte[] decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException { - ClientListenerNioServerBuffer nioBuf = ses.meta(BUF_META_KEY); - - // Decode for a given session is called per one thread, so there should not be any concurrency issues. - // However, we make some additional checks. - if (nioBuf == null) { - nioBuf = new ClientListenerNioServerBuffer(); - - ClientListenerNioServerBuffer old = ses.addMeta(BUF_META_KEY, nioBuf); + /** Reader metadata key. */ + static final int READER_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); - assert old == null; - } + /** */ + private final IgniteLogger log; - return nioBuf.read(buf); + /** */ + public ClientListenerNioMessageParser(IgniteLogger log) { + this.log = log; } /** {@inheritDoc} */ - @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException { - byte[] msg0 = (byte[])msg; + @Override public Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException { + Message msg = ses.removeMeta(MSG_META_KEY); + + try { + if (msg == null) + msg = new ClientMessage(); - ByteBuffer res = ByteBuffer.allocate(msg0.length + 4); + boolean finished = false; - res.order(ByteOrder.LITTLE_ENDIAN); + if (buf.hasRemaining()) + finished = msg.readFrom(buf, null); - res.putInt(msg0.length); - res.put(msg0); + if (finished) + return msg; + else { + ses.addMeta(MSG_META_KEY, msg); - res.flip(); + return null; + } + } + catch (Throwable e) { + U.error(log, "Failed to read message [msg=" + msg + + ", buf=" + buf + ", ses=" + ses + "]", e); + + throw e; + } + } - return res; + /** {@inheritDoc} */ + @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException { + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public String toString() { - return ClientListenerBufferedParser.class.getSimpleName(); + return ClientListenerNioMessageParser.class.getSimpleName(); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioServerBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioServerBuffer.java deleted file mode 100644 index ba9d6bf..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioServerBuffer.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.odbc; - -import java.nio.ByteBuffer; -import org.apache.ignite.IgniteCheckedException; -import org.jetbrains.annotations.Nullable; - -/** - * Client NIO server buffer. - */ -public class ClientListenerNioServerBuffer { - /** Current message data. */ - private byte[] data; - - /** Count of received bytes of the current message. */ - private int cnt = -4; - - /** Current message size. */ - private int msgSize; - - /** - * Reset buffer state. - */ - public void reset() { - msgSize = 0; - cnt = -4; - data = null; - } - - /** - * Checks whether the byte array is filled. - * - * @return Flag indicating whether byte array is filled or not. - */ - public boolean isFilled() { - return cnt > 0 && cnt == msgSize; - } - - /** - * Get data withing the buffer. - * - * @return Data. - */ - public byte[] data() { - return data; - } - - /** - * @param buf Buffer. - * @return Message bytes or {@code null} if message is not fully read yet. - * @throws IgniteCheckedException If failed to parse message. - */ - @Nullable public byte[] read(ByteBuffer buf) throws IgniteCheckedException { - if (cnt < 0) { - for (; cnt < 0 && buf.hasRemaining(); cnt++) - msgSize |= (buf.get() & 0xFF) << (8 * (4 + cnt)); - - if (cnt < 0) - return null; - - // If count is 0 then message size should be inited. - if (msgSize <= 0) - throw new IgniteCheckedException("Invalid message size: " + msgSize); - - data = new byte[msgSize]; - } - - assert msgSize > 0; - assert cnt >= 0; - - int remaining = buf.remaining(); - - // If there are more bytes in buffer. - if (remaining > 0) { - int missing = msgSize - cnt; - - // Read only up to message size. - if (missing > 0) { - int len = missing < remaining ? missing : remaining; - - buf.get(data, cnt, len); - - cnt += len; - } - } - - if (cnt == msgSize) { - byte[] data0 = data; - - reset(); - - return data0; - } - - return null; - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java index ecebc4a..517d213 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java @@ -86,13 +86,13 @@ public class ClientListenerProcessor extends GridProcessorAdapter { private static final int DFLT_SELECTOR_CNT = Math.min(4, Runtime.getRuntime().availableProcessors()); /** Default TCP direct buffer flag. */ - private static final boolean DFLT_TCP_DIRECT_BUF = false; + private static final boolean DFLT_TCP_DIRECT_BUF = true; /** Busy lock. */ private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); /** TCP Server. */ - private GridNioServer<byte[]> srv; + private GridNioServer<ClientMessage> srv; /** Executor service. */ private ExecutorService execSvc; @@ -158,14 +158,16 @@ public class ClientListenerProcessor extends GridProcessorAdapter { long idleTimeout = cliConnCfg.getIdleTimeout(); + int selectorCnt = cliConnCfg.getSelectorCount(); + for (int port = cliConnCfg.getPort(); port <= portTo && port <= 65535; port++) { try { - GridNioServer<byte[]> srv0 = GridNioServer.<byte[]>builder() + srv = GridNioServer.<ClientMessage>builder() .address(hostAddr) .port(port) .listener(new ClientListenerNioListener(ctx, busyLock, cliConnCfg)) .logger(log) - .selectorCount(DFLT_SELECTOR_CNT) + .selectorCount(selectorCnt) .igniteInstanceName(ctx.igniteInstanceName()) .serverName("client-listener") .tcpNoDelay(cliConnCfg.isTcpNoDelay()) @@ -174,12 +176,10 @@ public class ClientListenerProcessor extends GridProcessorAdapter { .socketSendBufferSize(cliConnCfg.getSocketSendBufferSize()) .socketReceiveBufferSize(cliConnCfg.getSocketReceiveBufferSize()) .filters(filters) - .directMode(false) + .directMode(true) .idleTimeout(idleTimeout > 0 ? idleTimeout : Long.MAX_VALUE) .build(); - srv = srv0; - ctx.ports().registerPort(port, IgnitePortProtocol.TCP, getClass()); if (log.isInfoEnabled()) @@ -287,14 +287,14 @@ public class ClientListenerProcessor extends GridProcessorAdapter { ClientListenerConnectionContext connCtx = ses.meta(ClientListenerNioListener.CONN_CTX_META_KEY); if (connCtx != null && connCtx.parser() != null && connCtx.handler().isCancellationSupported()) { - byte[] inMsg; + ClientMessage inMsg; int cmdType; long reqId; try { - inMsg = (byte[])msg; + inMsg = (ClientMessage)msg; cmdType = connCtx.parser().decodeCommandType(inMsg); @@ -324,7 +324,7 @@ public class ClientListenerProcessor extends GridProcessorAdapter { } }; - GridNioFilter codecFilter = new GridNioCodecFilter(new ClientListenerBufferedParser(), log, false); + GridNioFilter codecFilter = new GridNioCodecFilter(new ClientListenerNioMessageParser(log), log, true); if (cliConnCfg.isSslEnabled()) { Factory<SSLContext> sslCtxFactory = cliConnCfg.isUseIgniteSslContextFactory() ? @@ -337,7 +337,7 @@ public class ClientListenerProcessor extends GridProcessorAdapter { GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtxFactory.create(), true, ByteOrder.nativeOrder(), log); - sslFilter.directMode(false); + sslFilter.directMode(true); boolean auth = cliConnCfg.isSslClientAuth(); @@ -411,8 +411,7 @@ public class ClientListenerProcessor extends GridProcessorAdapter { private static String clientConnectionDescription( GridNioSession ses, ClientListenerConnectionContext ctx - ) - { + ) { AuthorizationContext authCtx = ctx.authorizationContext(); StringBuilder sb = new StringBuilder(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientMessage.java new file mode 100644 index 0000000..f1176d6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientMessage.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.nio.ByteBuffer; +import org.apache.ignite.internal.IgniteCodeGeneratingFail; +import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +@IgniteCodeGeneratingFail +public class ClientMessage implements Message, Externalizable { + /** */ + private static final long serialVersionUID = -4609408156037304495L; + + /** */ + private byte[] data; + + /** */ + private BinaryHeapOutputStream stream; + + /** */ + private int cnt = -4; + + /** */ + private int msgSize; + + /** */ + public ClientMessage() {} + + /** */ + public ClientMessage(byte[] data) { + this.data = data; + } + + /** */ + public ClientMessage(BinaryHeapOutputStream stream) { + this.stream = stream; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter ignored) { + assert stream != null || data != null; + + byte[] data = stream != null ? stream.array() : this.data; + int msgSize = stream != null ? stream.position() : data.length; + + if (cnt < 0) { + for (; cnt < 0 && buf.hasRemaining(); cnt++) + buf.put((byte) ((msgSize >> (8 * (4 + cnt))) & 0xFF)); + + if (cnt < 0) + return false; + } + + assert cnt >= 0; + assert msgSize > 0; + + int remaining = buf.remaining(); + + if (remaining > 0) { + int missing = msgSize - cnt; + + if (missing > 0) { + int len = Math.min(missing, remaining); + + buf.put(data, cnt, len); + + cnt += len; + } + } + + if (cnt == msgSize) { + cnt = -4; + + if (stream != null) { + U.closeQuiet(stream); + + stream = null; + } + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + if (cnt < 0) { + for (; cnt < 0 && buf.hasRemaining(); cnt++) + msgSize |= (buf.get() & 0xFF) << (8 * (4 + cnt)); + + if (cnt < 0) + return false; + + data = new byte[msgSize]; + } + + assert data != null; + assert cnt >= 0; + assert msgSize > 0; + + int remaining = buf.remaining(); + + if (remaining > 0) { + int missing = msgSize - cnt; + + if (missing > 0) { + int len = Math.min(missing, remaining); + + buf.get(data, cnt, len); + + cnt += len; + } + } + + if (cnt == msgSize) { + cnt = -4; + msgSize = 0; + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return Short.MIN_VALUE; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op + } + + /** + * @return Message payload. + */ + public byte[] payload() { + if (stream != null) { + data = stream.arrayCopy(); + U.closeQuiet(stream); + stream = null; + } + + return data; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + byte[] data = payload(); + out.writeInt(data.length); + out.write(data); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + data = new byte[in.readInt()]; + in.read(data, 0, data.length); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java index 0d9ea39..64cf609 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java @@ -216,9 +216,7 @@ public class JdbcConnectionContext extends ClientListenerAbstractConnectionConte if (log.isDebugEnabled()) log.debug("Async response: [resp=" + resp.status() + ']'); - byte[] outMsg = parser.encode(resp); - - ses.send(outMsg); + ses.send(parser.encode(resp)); } } }; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java index fdf7a12..255012b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProce import org.apache.ignite.internal.processors.odbc.ClientListenerMessageParser; import org.apache.ignite.internal.processors.odbc.ClientListenerRequest; import org.apache.ignite.internal.processors.odbc.ClientListenerResponse; +import org.apache.ignite.internal.processors.odbc.ClientMessage; /** * JDBC message parser. @@ -60,8 +61,8 @@ public class JdbcMessageParser implements ClientListenerMessageParser { * @param msg Message. * @return Reader. */ - protected BinaryReaderExImpl createReader(byte[] msg) { - BinaryInputStream stream = new BinaryHeapInputStream(msg); + protected BinaryReaderExImpl createReader(ClientMessage msg) { + BinaryInputStream stream = new BinaryHeapInputStream(msg.payload()); return new BinaryReaderExImpl(binCtx, stream, ctx.config().getClassLoader(), true); } @@ -76,7 +77,7 @@ public class JdbcMessageParser implements ClientListenerMessageParser { } /** {@inheritDoc} */ - @Override public ClientListenerRequest decode(byte[] msg) { + @Override public ClientListenerRequest decode(ClientMessage msg) { assert msg != null; BinaryReaderExImpl reader = createReader(msg); @@ -85,7 +86,7 @@ public class JdbcMessageParser implements ClientListenerMessageParser { } /** {@inheritDoc} */ - @Override public byte[] encode(ClientListenerResponse msg) { + @Override public ClientMessage encode(ClientListenerResponse msg) { assert msg != null; assert msg instanceof JdbcResponse; @@ -96,20 +97,20 @@ public class JdbcMessageParser implements ClientListenerMessageParser { res.writeBinary(writer, protoCtx); - return writer.array(); + return new ClientMessage(writer.array()); } /** {@inheritDoc} */ - @Override public int decodeCommandType(byte[] msg) { + @Override public int decodeCommandType(ClientMessage msg) { assert msg != null; - return JdbcRequest.readType(msg); + return JdbcRequest.readType(msg.payload()); } /** {@inheritDoc} */ - @Override public long decodeRequestId(byte[] msg) { + @Override public long decodeRequestId(ClientMessage msg) { assert msg != null; - return JdbcRequest.readRequestId(msg); + return JdbcRequest.readRequestId(msg.payload()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java index 0cc22d1..74def0d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java @@ -157,9 +157,7 @@ public class OdbcConnectionContext extends ClientListenerAbstractConnectionConte if (log.isDebugEnabled()) log.debug("Async response: [resp=" + resp.status() + ']'); - byte[] outMsg = parser.encode(resp); - - ses.send(outMsg); + ses.send(parser.encode(resp)); } } }; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java index 0e66c93..c9b779f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.odbc.ClientListenerMessageParser; import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion; import org.apache.ignite.internal.processors.odbc.ClientListenerRequest; import org.apache.ignite.internal.processors.odbc.ClientListenerResponse; +import org.apache.ignite.internal.processors.odbc.ClientMessage; import org.apache.ignite.internal.processors.odbc.SqlListenerUtils; import org.jetbrains.annotations.NotNull; @@ -78,10 +79,10 @@ public class OdbcMessageParser implements ClientListenerMessageParser { } /** {@inheritDoc} */ - @Override public ClientListenerRequest decode(byte[] msg) { + @Override public ClientListenerRequest decode(ClientMessage msg) { assert msg != null; - BinaryInputStream stream = new BinaryHeapInputStream(msg); + BinaryInputStream stream = new BinaryHeapInputStream(msg.payload()); BinaryReaderExImpl reader = new BinaryReaderExImpl(marsh.context(), stream, ctx.config().getClassLoader(), true); @@ -242,7 +243,7 @@ public class OdbcMessageParser implements ClientListenerMessageParser { } /** {@inheritDoc} */ - @Override public byte[] encode(ClientListenerResponse msg0) { + @Override public ClientMessage encode(ClientListenerResponse msg0) { assert msg0 != null; assert msg0 instanceof OdbcResponse; @@ -263,13 +264,13 @@ public class OdbcMessageParser implements ClientListenerMessageParser { if (msg.status() != ClientListenerResponse.STATUS_SUCCESS) { writer.writeString(msg.error()); - return writer.array(); + return new ClientMessage(writer.array()); } Object res0 = msg.response(); if (res0 == null) - return writer.array(); + return new ClientMessage(writer.array()); else if (res0 instanceof OdbcQueryExecuteResult) { OdbcQueryExecuteResult res = (OdbcQueryExecuteResult) res0; @@ -406,19 +407,19 @@ public class OdbcMessageParser implements ClientListenerMessageParser { else assert false : "Should not reach here."; - return writer.array(); + return new ClientMessage(writer.array()); } /** {@inheritDoc} */ - @Override public int decodeCommandType(byte[] msg) { + @Override public int decodeCommandType(ClientMessage msg) { assert msg != null; - return msg[0]; + return msg.payload()[0]; } /** {@inheritDoc} */ - @Override public long decodeRequestId(byte[] msg) { + @Override public long decodeRequestId(ClientMessage msg) { return 0; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java index d3b10c5..66b9e89 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java @@ -23,10 +23,12 @@ import org.apache.ignite.internal.binary.GridBinaryMarshaller; import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream; import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream; import org.apache.ignite.internal.binary.streams.BinaryInputStream; +import org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.processors.odbc.ClientListenerMessageParser; import org.apache.ignite.internal.processors.odbc.ClientListenerRequest; import org.apache.ignite.internal.processors.odbc.ClientListenerResponse; +import org.apache.ignite.internal.processors.odbc.ClientMessage; import org.apache.ignite.internal.processors.platform.client.binary.ClientBinaryTypeGetRequest; import org.apache.ignite.internal.processors.platform.client.binary.ClientBinaryTypeNameGetRequest; import org.apache.ignite.internal.processors.platform.client.binary.ClientBinaryTypeNamePutRequest; @@ -284,10 +286,10 @@ public class ClientMessageParser implements ClientListenerMessageParser { } /** {@inheritDoc} */ - @Override public ClientListenerRequest decode(byte[] msg) { + @Override public ClientListenerRequest decode(ClientMessage msg) { assert msg != null; - BinaryInputStream inStream = new BinaryHeapInputStream(msg); + BinaryInputStream inStream = new BinaryHeapInputStream(msg.payload()); // skipHdrCheck must be true (we have 103 op code). BinaryReaderExImpl reader = new BinaryReaderExImpl(marsh.context(), inStream, @@ -473,10 +475,10 @@ public class ClientMessageParser implements ClientListenerMessageParser { } /** {@inheritDoc} */ - @Override public byte[] encode(ClientListenerResponse resp) { + @Override public ClientMessage encode(ClientListenerResponse resp) { assert resp != null; - BinaryHeapOutputStream outStream = new BinaryHeapOutputStream(32); + BinaryHeapOutputStream outStream = new BinaryHeapOutputStream(32, BinaryMemoryAllocator.POOLED.chunk()); BinaryRawWriterEx writer = marsh.writer(outStream); @@ -484,20 +486,20 @@ public class ClientMessageParser implements ClientListenerMessageParser { ((ClientOutgoingMessage)resp).encode(ctx, writer); - return outStream.arrayCopy(); + return new ClientMessage(outStream); } /** {@inheritDoc} */ - @Override public int decodeCommandType(byte[] msg) { + @Override public int decodeCommandType(ClientMessage msg) { assert msg != null; - BinaryInputStream inStream = new BinaryHeapInputStream(msg); + BinaryInputStream inStream = new BinaryHeapInputStream(msg.payload()); return inStream.readShort(); } /** {@inheritDoc} */ - @Override public long decodeRequestId(byte[] msg) { + @Override public long decodeRequestId(ClientMessage msg) { return 0; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataInput.java b/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataInput.java index a914078..b3c8cda 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataInput.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataInput.java @@ -28,7 +28,7 @@ import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK; -import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocatorChunk.DFLT_MARSHAL_BUFFERS_RECHECK; +import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.DFLT_MARSHAL_BUFFERS_RECHECK; import static org.apache.ignite.internal.util.GridUnsafe.BIG_ENDIAN; import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF; import static org.apache.ignite.internal.util.GridUnsafe.CHAR_ARR_OFF; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataOutput.java b/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataOutput.java index 93f1a95..a165a07 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataOutput.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataOutput.java @@ -26,7 +26,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK; -import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocatorChunk.DFLT_MARSHAL_BUFFERS_RECHECK; +import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.DFLT_MARSHAL_BUFFERS_RECHECK; import static org.apache.ignite.internal.util.GridUnsafe.BIG_ENDIAN; import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF; import static org.apache.ignite.internal.util.GridUnsafe.CHAR_ARR_OFF; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 9c0c6b1..41574ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -1401,16 +1401,7 @@ public class GridNioServer<T> { GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); - MessageWriter writer = ses.meta(MSG_WRITER.ordinal()); - - if (writer == null) { - try { - ses.addMeta(MSG_WRITER.ordinal(), writer = writerFactory.writer(ses)); - } - catch (IgniteCheckedException e) { - throw new IOException("Failed to create message writer.", e); - } - } + MessageWriter writer = messageWriter(ses); boolean handshakeFinished = sslFilter.lock(ses); @@ -1659,16 +1650,7 @@ public class GridNioServer<T> { ByteBuffer buf = ses.writeBuffer(); SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal()); - MessageWriter writer = ses.meta(MSG_WRITER.ordinal()); - - if (writer == null) { - try { - ses.addMeta(MSG_WRITER.ordinal(), writer = writerFactory.writer(ses)); - } - catch (IgniteCheckedException e) { - throw new IOException("Failed to create message writer.", e); - } - } + MessageWriter writer = messageWriter(ses); if (req == null) { req = systemMessage(ses); @@ -1739,6 +1721,25 @@ public class GridNioServer<T> { buf.clear(); } + /** */ + @Nullable private MessageWriter messageWriter(GridSelectorNioSessionImpl ses) throws IOException { + if (writerFactory == null) + return null; + + MessageWriter writer = ses.meta(MSG_WRITER.ordinal()); + + if (writer == null) { + try { + ses.addMeta(MSG_WRITER.ordinal(), writer = writerFactory.writer(ses)); + } + catch (IgniteCheckedException e) { + throw new IOException("Failed to create message writer.", e); + } + } + + return writer; + } + /** * @param writer Customizer of writing. * @param buf Buffer to write. diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java index 8e6c966..1c92e8d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java @@ -107,7 +107,7 @@ import org.junit.Assert; import org.junit.Test; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.INSTANCE; +import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.THREAD_LOCAL; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertNotEquals; @@ -2834,28 +2834,28 @@ public class BinaryMarshallerSelfTest extends GridCommonAbstractTest { @Test public void testThreadLocalArrayReleased() throws Exception { // Checking the writer directly. - assertEquals(false, INSTANCE.isAcquired()); + assertEquals(false, THREAD_LOCAL.isAcquired()); BinaryMarshaller marsh = binaryMarshaller(); try (BinaryWriterExImpl writer = new BinaryWriterExImpl(binaryContext(marsh))) { - assertEquals(true, INSTANCE.isAcquired()); + assertEquals(true, THREAD_LOCAL.isAcquired()); writer.writeString("Thread local test"); writer.array(); - assertEquals(true, INSTANCE.isAcquired()); + assertEquals(true, THREAD_LOCAL.isAcquired()); } // Checking the binary marshaller. - assertEquals(false, INSTANCE.isAcquired()); + assertEquals(false, THREAD_LOCAL.isAcquired()); marsh = binaryMarshaller(); marsh.marshal(new SimpleObject()); - assertEquals(false, INSTANCE.isAcquired()); + assertEquals(false, THREAD_LOCAL.isAcquired()); marsh = binaryMarshaller(); @@ -2867,7 +2867,7 @@ public class BinaryMarshallerSelfTest extends GridCommonAbstractTest { BinaryObject binaryObj = builder.build(); - assertEquals(false, INSTANCE.isAcquired()); + assertEquals(false, THREAD_LOCAL.isAcquired()); } /**