IGNITE-3060 Discovery: optimize resource usage for client connections
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7b0edfbd Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7b0edfbd Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7b0edfbd Branch: refs/heads/ignite-3163 Commit: 7b0edfbdf1790dbb7a4f935d486bc0af5e5e2a27 Parents: 84b2fdf Author: sboikov <[email protected]> Authored: Wed Apr 27 15:52:18 2016 +0300 Committer: sboikov <[email protected]> Committed: Wed Apr 27 15:52:18 2016 +0300 ---------------------------------------------------------------------- .../ignite/internal/IgniteComponentType.java | 4 +- .../continuous/CacheContinuousQueryHandler.java | 9 + .../ignite/spi/discovery/tcp/ServerImpl.java | 168 ++++++++++++------- .../spi/discovery/tcp/TcpDiscoverySpi.java | 5 +- .../TcpDiscoveryClientHeartbeatMessage.java | 1 + .../tcp/TcpClientDiscoverySpiSelfTest.java | 44 ++++- 6 files changed, 160 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7b0edfbd/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java index 01872b6..76e495f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java @@ -285,7 +285,7 @@ public enum IgniteComponentType { return (T)ctor.newInstance(ctx); } } - catch (Exception e) { + catch (Throwable e) { throw componentException(e); } } @@ -309,7 +309,7 @@ public enum IgniteComponentType { * @param err Creation error. * @return Component creation exception. */ - private IgniteCheckedException componentException(Exception err) { + private IgniteCheckedException componentException(Throwable err) { return new IgniteCheckedException("Failed to create Ignite component (consider adding " + module + " module to classpath) [component=" + this + ", cls=" + clsName + ']', err); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7b0edfbd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index a46a526..d0a3722 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -638,6 +638,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler Collection<CacheContinuousQueryEntry> entries) { final GridCacheContext cctx = cacheContext(ctx); + if (cctx == null) { + IgniteLogger log = ctx.log(CacheContinuousQueryHandler.class); + + if (log.isDebugEnabled()) + log.debug("Failed to notify callback, cache is not found: " + cacheId); + + return; + } + final Collection<CacheEntryEvent<? extends K, ? extends V>> entries0 = new ArrayList<>(entries.size()); for (CacheContinuousQueryEntry e : entries) { http://git-wip-us.apache.org/repos/asf/ignite/blob/7b0edfbd/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index b082ba2..ab2f6b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -160,7 +160,7 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe @SuppressWarnings("All") class ServerImpl extends TcpDiscoveryImpl { /** */ - private static final int ENSURED_MSG_HIST_SIZE = getInteger(IGNITE_DISCOVERY_HISTORY_SIZE, 1024 * 10); + private static final int ENSURED_MSG_HIST_SIZE = getInteger(IGNITE_DISCOVERY_HISTORY_SIZE, 1024); /** */ private static final IgniteProductVersion CUSTOM_MSG_ALLOW_JOINING_FOR_VERIFIED_SINCE = @@ -1398,6 +1398,8 @@ class ServerImpl extends TcpDiscoveryImpl { ", topSize=" + ring.allNodes().size() + ", leavingNodesSize=" + leavingNodesSize + ", failedNodesSize=" + failedNodesSize + ", msgWorker.queue.size=" + (msgWorker != null ? msgWorker.queueSize() : "N/A") + + ", clients=" + ring.clientNodes().size() + + ", clientWorkers=" + clientMsgWorkers.size() + ", lastUpdate=" + (locNode != null ? U.format(locNode.lastUpdateTime()) : "N/A") + ", heapFree=" + runtime.freeMemory() / (1024 * 1024) + "M, heapTotal=" + runtime.maxMemory() / (1024 * 1024) + "M]"); @@ -2101,7 +2103,7 @@ class ServerImpl extends TcpDiscoveryImpl { /** * Message worker thread for messages processing. */ - private class RingMessageWorker extends MessageWorkerAdapter { + private class RingMessageWorker extends MessageWorkerAdapter<TcpDiscoveryAbstractMessage> { /** Next node. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) private TcpDiscoveryNode next; @@ -2159,6 +2161,31 @@ class ServerImpl extends TcpDiscoveryImpl { initConnectionCheckFrequency(); } + /** + * Adds message to queue. + * + * @param msg Message to add. + */ + void addMessage(TcpDiscoveryAbstractMessage msg) { + if ((msg instanceof TcpDiscoveryStatusCheckMessage || + msg instanceof TcpDiscoveryJoinRequestMessage || + msg instanceof TcpDiscoveryCustomEventMessage) && + queue.contains(msg)) { + if (log.isDebugEnabled()) + log.debug("Ignoring duplicate message: " + msg); + + return; + } + + if (msg.highPriority()) + queue.addFirst(msg); + else + queue.add(msg); + + if (log.isDebugEnabled()) + log.debug("Message has been added to queue: " + msg); + } + /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { try { @@ -2320,28 +2347,44 @@ class ServerImpl extends TcpDiscoveryImpl { */ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) { if (redirectToClients(msg)) { - byte[] marshalledMsg = null; + byte[] msgBytes = null; for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) { - // Send a clone to client to avoid ConcurrentModificationException - TcpDiscoveryAbstractMessage msgClone; - - try { - if (marshalledMsg == null) - marshalledMsg = spi.marsh.marshal(msg); + if (msgBytes == null) { + try { + msgBytes = spi.marsh.marshal(msg); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to marshal message: " + msg, e); - msgClone = spi.marsh.unmarshal(marshalledMsg, - U.resolveClassLoader(spi.ignite().configuration())); + break; + } } - catch (IgniteCheckedException e) { - U.error(log, "Failed to marshal message: " + msg, e); - msgClone = msg; - } + TcpDiscoveryAbstractMessage msg0 = msg; + byte[] msgBytes0 = msgBytes; + + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg; + + TcpDiscoveryNode node = nodeAddedMsg.node(); + + if (clientMsgWorker.clientNodeId.equals(node.id())) { + try { + msg0 = spi.marsh.unmarshal(msgBytes, + U.resolveClassLoader(spi.ignite().configuration())); - prepareNodeAddedMessage(msgClone, clientMsgWorker.clientNodeId, null, null, null); + prepareNodeAddedMessage(msg0, clientMsgWorker.clientNodeId, null, null, null); - clientMsgWorker.addMessage(msgClone); + msgBytes0 = null; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to create message copy: " + msg, e); + } + } + } + + clientMsgWorker.addMessage(msg0, msgBytes0); } } } @@ -5599,16 +5642,13 @@ class ServerImpl extends TcpDiscoveryImpl { /** */ - private class ClientMessageWorker extends MessageWorkerAdapter { + private class ClientMessageWorker extends MessageWorkerAdapter<T2<TcpDiscoveryAbstractMessage, byte[]>> { /** Node ID. */ private final UUID clientNodeId; /** Socket. */ private final Socket sock; - /** Output stream. */ - private final OutputStream out; - /** Current client metrics. */ private volatile ClusterMetrics metrics; @@ -5627,8 +5667,6 @@ class ServerImpl extends TcpDiscoveryImpl { this.sock = sock; this.clientNodeId = clientNodeId; - - out = new BufferedOutputStream(sock.getOutputStream(), sock.getSendBufferSize()); } /** @@ -5652,11 +5690,43 @@ class ServerImpl extends TcpDiscoveryImpl { this.metrics = metrics; } + /** + * @param msg Message. + */ + public void addMessage(TcpDiscoveryAbstractMessage msg) { + addMessage(msg, null); + } + + /** + * @param msg Message. + * @param msgBytes Optional message bytes. + */ + public void addMessage(TcpDiscoveryAbstractMessage msg, @Nullable byte[] msgBytes) { + T2 t = new T2<>(msg, msgBytes); + + if (msg.highPriority()) + queue.addFirst(t); + else + queue.add(t); + + if (log.isDebugEnabled()) + log.debug("Message has been added to client queue: " + msg); + } + /** {@inheritDoc} */ - @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) { + @Override protected void processMessage(T2<TcpDiscoveryAbstractMessage, byte[]> msgT) { + boolean success = false; + + TcpDiscoveryAbstractMessage msg = msgT.get1(); + try { assert msg.verified() : msg; + byte[] msgBytes = msgT.get2(); + + if (msgBytes == null) + msgBytes = spi.marsh.marshal(msg); + if (msg instanceof TcpDiscoveryClientAckResponse) { if (clientVer == null) { ClusterNode node = spi.getNode(clientNodeId); @@ -5675,7 +5745,7 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Sending message ack to client [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); - spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled() ? + spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout()); } } @@ -5686,9 +5756,11 @@ class ServerImpl extends TcpDiscoveryImpl { assert topologyInitialized(msg) : msg; - spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled() ? + spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout()); } + + success = true; } catch (IgniteCheckedException | IOException e) { if (log.isDebugEnabled()) @@ -5697,12 +5769,15 @@ class ServerImpl extends TcpDiscoveryImpl { onException("Client connection failed [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']', e); + } + finally { + if (!success) { + clientMsgWorkers.remove(clientNodeId, this); - clientMsgWorkers.remove(clientNodeId, this); - - U.interrupt(this); + U.interrupt(this); - U.closeQuiet(sock); + U.closeQuiet(sock); + } } } @@ -5792,9 +5867,9 @@ class ServerImpl extends TcpDiscoveryImpl { /** * Base class for message workers. */ - protected abstract class MessageWorkerAdapter extends IgniteSpiThread { + protected abstract class MessageWorkerAdapter<T> extends IgniteSpiThread { /** Message queue. */ - private final BlockingDeque<TcpDiscoveryAbstractMessage> queue = new LinkedBlockingDeque<>(); + protected final BlockingDeque<T> queue = new LinkedBlockingDeque<>(); /** Backed interrupted flag. */ private volatile boolean interrupted; @@ -5820,7 +5895,7 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Message worker started [locNodeId=" + getConfiguredNodeId() + ']'); while (!isInterrupted()) { - TcpDiscoveryAbstractMessage msg = queue.poll(pollingTimeout, TimeUnit.MILLISECONDS); + T msg = queue.poll(pollingTimeout, TimeUnit.MILLISECONDS); if (msg == null) noMessageLoop(); @@ -5849,34 +5924,9 @@ class ServerImpl extends TcpDiscoveryImpl { } /** - * Adds message to queue. - * - * @param msg Message to add. - */ - void addMessage(TcpDiscoveryAbstractMessage msg) { - if ((msg instanceof TcpDiscoveryStatusCheckMessage || - msg instanceof TcpDiscoveryJoinRequestMessage || - msg instanceof TcpDiscoveryCustomEventMessage) && - queue.contains(msg)) { - if (log.isDebugEnabled()) - log.debug("Ignoring duplicate message: " + msg); - - return; - } - - if (msg.highPriority()) - queue.addFirst(msg); - else - queue.add(msg); - - if (log.isDebugEnabled()) - log.debug("Message has been added to queue: " + msg); - } - - /** * @param msg Message. */ - protected abstract void processMessage(TcpDiscoveryAbstractMessage msg); + protected abstract void processMessage(T msg); /** * Called when there is no message to process giving ability to perform other activity. http://git-wip-us.apache.org/repos/asf/ignite/blob/7b0edfbd/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index abe380c..73b541f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1265,7 +1265,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout)); - writeToSocket(sock, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout)); + writeToSocket(sock, null, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout)); return sock; } @@ -1295,12 +1295,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * Writes message to the socket. * * @param sock Socket. + * @param msg Message. * @param data Raw data to write. * @param timeout Socket write timeout. * @throws IOException If IO failed or write timed out. */ @SuppressWarnings("ThrowFromFinallyBlock") - private void writeToSocket(Socket sock, byte[] data, long timeout) throws IOException { + protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data, long timeout) throws IOException { assert sock != null; assert data != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/7b0edfbd/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java index 37b578c..3993de0 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java @@ -38,6 +38,7 @@ public class TcpDiscoveryClientHeartbeatMessage extends TcpDiscoveryAbstractMess * Constructor. * * @param creatorNodeId Creator node. + * @param metrics Metrics. */ public TcpDiscoveryClientHeartbeatMessage(UUID creatorNodeId, ClusterMetrics metrics) { super(creatorNodeId); http://git-wip-us.apache.org/repos/asf/ignite/blob/7b0edfbd/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index e01094c..331b581 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -1736,8 +1736,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { final AtomicBoolean err = new AtomicBoolean(false); client.events().localListen(new IgnitePredicate<Event>() { - @Override - public boolean apply(Event evt) { + @Override public boolean apply(Event evt) { if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { log.info("Disconnected event."); @@ -2158,17 +2157,49 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + @Override protected void writeToSocket(Socket sock, + OutputStream out, + TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { waitFor(writeLock); + if (!onMessage(sock, msg)) + return; + + super.writeToSocket(sock, out, msg, timeout); + + if (afterWrite != null) + afterWrite.apply(msg, sock); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] msgBytes, + long timeout) throws IOException { + waitFor(writeLock); + + if (!onMessage(sock, msg)) + return; + + super.writeToSocket(sock, msg, msgBytes, timeout); + + if (afterWrite != null) + afterWrite.apply(msg, sock); + } + + /** + * @param sock Socket. + * @param msg Message. + * @return {@code False} if should not further process message. + * @throws IOException If failed. + */ + private boolean onMessage(Socket sock, TcpDiscoveryAbstractMessage msg) throws IOException { boolean fail = false; if (skipNodeAdded && (msg instanceof TcpDiscoveryNodeAddedMessage || msg instanceof TcpDiscoveryNodeAddFinishedMessage)) { log.info("Skip message: " + msg); - return; + return false; } if (msg instanceof TcpDiscoveryNodeAddedMessage) @@ -2184,10 +2215,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { sock.close(); } - super.writeToSocket(sock, out, msg, timeout); - - if (afterWrite != null) - afterWrite.apply(msg, sock); + return true; } /** {@inheritDoc} */
