Merge master into ignite-843
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d145cb70 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d145cb70 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d145cb70 Branch: refs/heads/ignite-843 Commit: d145cb7093c53069c514fcf6002f65f43ad2542e Parents: c01723f 6844370 Author: Andrey <anovi...@gridgain.com> Authored: Tue Oct 13 09:23:10 2015 +0700 Committer: Andrey <anovi...@gridgain.com> Committed: Tue Oct 13 09:23:19 2015 +0700 ---------------------------------------------------------------------- .../src/main/js/views/sql/chart-settings.jade | 2 +- .../org/apache/ignite/IgniteTransactions.java | 4 - .../processors/cache/GridCacheIoManager.java | 93 +- .../processors/cache/GridCacheProcessor.java | 23 +- .../dht/GridClientPartitionTopology.java | 36 +- .../distributed/dht/GridDhtLocalPartition.java | 12 +- .../dht/GridDhtPartitionTopologyImpl.java | 28 +- .../GridDhtPartitionDemandMessage.java | 4 +- .../GridDhtPartitionSupplyMessage.java | 3 +- .../GridDhtPartitionsExchangeFuture.java | 70 +- .../preloader/GridDhtPartitionsFullMessage.java | 12 +- .../GridDhtPartitionsSingleMessage.java | 11 +- .../dht/preloader/GridDhtPreloader.java | 34 +- .../communication/tcp/TcpCommunicationSpi.java | 1394 +++++++++--------- .../IgniteCacheConfigurationTemplateTest.java | 41 +- .../dht/GridCacheDhtPreloadPerformanceTest.java | 133 ++ .../near/GridCacheNearTxForceKeyTest.java | 2 +- 17 files changed, 995 insertions(+), 907 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d145cb70/modules/control-center-web/src/main/js/views/sql/chart-settings.jade ---------------------------------------------------------------------- diff --cc modules/control-center-web/src/main/js/views/sql/chart-settings.jade index 4b350a4,0000000..c1bedd1 mode 100644,000000..100644 --- a/modules/control-center-web/src/main/js/views/sql/chart-settings.jade +++ b/modules/control-center-web/src/main/js/views/sql/chart-settings.jade @@@ -1,38 -1,0 +1,38 @@@ +//- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + - .popover.settings(tabindex='-1' style='width: 300px') ++.popover.settings(tabindex='-1') + .arrow + h3.popover-title(style='color: black') Chart settings + button.close(id='chart-settings-close' ng-click='$hide()') × + .popover-content + form.form-horizontal.chart-settings(name='chartSettingsForm' novalidate) + .form-group.chart-settings + label All columns (drag columns to axis) + ul.chart-settings-columns-list(dnd-list='paragraph.chartColumns' dnd-allowed-types='[]') + li(ng-repeat='col in paragraph.chartColumns track by $index') + .btn.btn-default.btn-chart-column-movable(dnd-draggable='col' dnd-effect-allowed='copy') {{col.label}} + label X axis (accept only one column) + ul.chart-settings-columns-list(dnd-list='paragraph.chartKeyCols' dnd-drop='chartAcceptKeyColumn(paragraph, item)') + li(ng-repeat='col in paragraph.chartKeyCols track by $index') + .btn.btn-info.btn-chart-column {{col.label}} + i.fa.fa-close(ng-click='chartRemoveKeyColumn(paragraph, $index)') + label Y axis (accept only numeric columns) + ul.chart-settings-columns-list(dnd-list='paragraph.chartValCols' dnd-drop='chartAcceptValColumn(paragraph, item)') + li(ng-repeat='col in paragraph.chartValCols track by $index') + .btn.btn-success.btn-chart-column {{col.label}} + i.fa.fa-close(ng-click='chartRemoveValColumn(paragraph, $index)') + http://git-wip-us.apache.org/repos/asf/ignite/blob/d145cb70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index c81dae5,476a96c..85ebe94 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@@ -77,33 -75,30 +75,22 @@@ import static org.apache.ignite.interna public class GridCacheIoManager extends GridCacheSharedManagerAdapter { /** Message ID generator. */ private static final AtomicLong idGen = new AtomicLong(); -- ++ /** Mutex. */ ++ private final GridSpinReadWriteLock rw = new GridSpinReadWriteLock(); /** Delay in milliseconds between retries. */ private long retryDelay; -- /** Number of retries using to send messages. */ private int retryCnt; -- /** Indexed class handlers. */ private Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new HashMap<>(); -- /** Handler registry. */ private ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>> clsHandlers = new ConcurrentHashMap8<>(); -- /** Ordered handler registry. */ private ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends GridCacheMessage>> orderedHandlers = new ConcurrentHashMap8<>(); -- /** Stopping flag. */ private boolean stopping; - - /** Error flag. */ - private final AtomicBoolean startErr = new AtomicBoolean(); -- -- /** Mutex. */ -- private final GridSpinReadWriteLock rw = new GridSpinReadWriteLock(); -- /** Deployment enabled. */ private boolean depEnabled; @@@ -982,32 -981,32 +973,6 @@@ } /** -- * Ordered message listener. -- */ -- private class OrderedMessageListener implements GridMessageListener { -- /** */ -- private final IgniteBiInClosure<UUID, GridCacheMessage> c; -- -- /** -- * @param c Handler closure. -- */ -- OrderedMessageListener(IgniteBiInClosure<UUID, GridCacheMessage> c) { -- this.c = c; -- } -- -- /** {@inheritDoc} */ -- @SuppressWarnings({"CatchGenericClass", "unchecked"}) -- @Override public void onMessage(final UUID nodeId, Object msg) { -- if (log.isDebugEnabled()) -- log.debug("Received cache ordered message [nodeId=" + nodeId + ", msg=" + msg + ']'); -- -- final GridCacheMessage cacheMsg = (GridCacheMessage)msg; -- -- onMessage0(nodeId, cacheMsg, c); -- } -- } -- -- /** * */ private static class ListenerKey { @@@ -1048,4 -1047,4 +1013,30 @@@ return res; } } ++ ++ /** ++ * Ordered message listener. ++ */ ++ private class OrderedMessageListener implements GridMessageListener { ++ /** */ ++ private final IgniteBiInClosure<UUID, GridCacheMessage> c; ++ ++ /** ++ * @param c Handler closure. ++ */ ++ OrderedMessageListener(IgniteBiInClosure<UUID, GridCacheMessage> c) { ++ this.c = c; ++ } ++ ++ /** {@inheritDoc} */ ++ @SuppressWarnings({"CatchGenericClass", "unchecked"}) ++ @Override public void onMessage(final UUID nodeId, Object msg) { ++ if (log.isDebugEnabled()) ++ log.debug("Received cache ordered message [nodeId=" + nodeId + ", msg=" + msg + ']'); ++ ++ final GridCacheMessage cacheMsg = (GridCacheMessage)msg; ++ ++ onMessage0(nodeId, cacheMsg, c); ++ } ++ } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d145cb70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d145cb70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 5e3cc0b,162c116..a62ea3d --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@@ -60,40 -60,40 +60,29 @@@ public class GridClientPartitionTopolog /** Flag to control amount of output for full map. */ private static final boolean FULL_MAP_DEBUG = false; -- ++ /** Logger. */ ++ private final IgniteLogger log; ++ /** */ ++ private final GridAtomicLong updateSeq = new GridAtomicLong(1); ++ /** Lock. */ ++ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); /** Cache shared context. */ private GridCacheSharedContext cctx; -- /** Cache ID. */ private int cacheId; -- -- /** Logger. */ -- private final IgniteLogger log; -- /** Node to partition map. */ private GridDhtPartitionFullMap node2part; -- /** Partition to node map. */ private Map<Integer, Set<UUID>> part2node = new HashMap<>(); -- /** */ private GridDhtPartitionExchangeId lastExchangeId; -- /** */ private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE; -- /** */ private volatile boolean stopping; -- /** A future that will be completed when topology with version topVer will be ready to use. */ private GridDhtTopologyFuture topReadyFut; -- /** */ -- private final GridAtomicLong updateSeq = new GridAtomicLong(1); -- -- /** Lock. */ -- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); -- /** * @param cctx Context. * @param cacheId Cache ID. http://git-wip-us.apache.org/repos/asf/ignite/blob/d145cb70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 2deabfe,4f124e6..06272ac --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@@ -100,21 -100,21 +100,16 @@@ public class GridDhtLocalPartition impl /** Create time. */ @GridToStringExclude private final long createTime = U.currentTimeMillis(); -- -- /** Eviction history. */ -- private volatile Map<KeyCacheObject, GridCacheVersion> evictHist = new HashMap<>(); -- /** Lock. */ private final ReentrantLock lock = new ReentrantLock(); -- /** Public size counter. */ private final LongAdder8 mapPubSize = new LongAdder8(); -- /** Remove queue. */ private final GridCircularBuffer<T2<KeyCacheObject, GridCacheVersion>> rmvQueue; -- /** Group reservations. */ private final CopyOnWriteArrayList<GridDhtPartitionsReservation> reservations = new CopyOnWriteArrayList<>(); ++ /** Eviction history. */ ++ private volatile Map<KeyCacheObject, GridCacheVersion> evictHist = new HashMap<>(); /** * @param cctx Context. http://git-wip-us.apache.org/repos/asf/ignite/blob/d145cb70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index a0c9c88,6bd283a..ba76eca --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@@ -77,31 -77,31 +77,23 @@@ class GridDhtPartitionTopologyImpl impl /** */ private final ConcurrentMap<Integer, GridDhtLocalPartition> locParts = new ConcurrentHashMap8<>(); -- ++ /** */ ++ private final GridAtomicLong updateSeq = new GridAtomicLong(1); ++ /** Lock. */ ++ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); /** Node to partition map. */ private GridDhtPartitionFullMap node2part; -- /** Partition to node map. */ private Map<Integer, Set<UUID>> part2node = new HashMap<>(); -- /** */ private GridDhtPartitionExchangeId lastExchangeId; -- /** */ private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE; -- /** */ private volatile boolean stopping; -- /** A future that will be completed when topology with version topVer will be ready to use. */ private GridDhtTopologyFuture topReadyFut; -- /** */ -- private final GridAtomicLong updateSeq = new GridAtomicLong(1); -- -- /** Lock. */ -- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); -- /** * @param cctx Context. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d145cb70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index a1b03c1,77e47a7..7230393 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@@ -101,83 -101,83 +101,61 @@@ public class GridDhtPartitionsExchangeF /** Dummy reassign flag. */ private final boolean reassign; -- -- /** Discovery event. */ -- private volatile DiscoveryEvent discoEvt; -- /** */ @GridToStringInclude private final Collection<UUID> rcvdIds = new GridConcurrentHashSet<>(); -- -- /** Remote nodes. */ -- private volatile Collection<ClusterNode> rmtNodes; -- -- /** Remote nodes. */ -- @GridToStringInclude -- private volatile Collection<UUID> rmtIds; -- /** Oldest node. */ @GridToStringExclude private final AtomicReference<ClusterNode> oldestNode = new AtomicReference<>(); -- /** ExchangeFuture id. */ private final GridDhtPartitionExchangeId exchId; -- /** Init flag. */ @GridToStringInclude private final AtomicBoolean init = new AtomicBoolean(false); -- /** Ready for reply flag. */ @GridToStringInclude private final AtomicBoolean ready = new AtomicBoolean(false); -- /** Replied flag. */ @GridToStringInclude private final AtomicBoolean replied = new AtomicBoolean(false); -- ++ /** Cache context. */ ++ private final GridCacheSharedContext<?, ?> cctx; ++ /** ++ * Messages received on non-coordinator are stored in case if this node ++ * becomes coordinator. ++ */ ++ private final Map<UUID, GridDhtPartitionsSingleMessage> singleMsgs = new ConcurrentHashMap8<>(); ++ /** Messages received from new coordinator. */ ++ private final Map<UUID, GridDhtPartitionsFullMessage> fullMsgs = new ConcurrentHashMap8<>(); ++ /** */ ++ private final Object mux = new Object(); ++ /** Discovery event. */ ++ private volatile DiscoveryEvent discoEvt; ++ /** Remote nodes. */ ++ private volatile Collection<ClusterNode> rmtNodes; ++ /** Remote nodes. */ ++ @GridToStringInclude ++ private volatile Collection<UUID> rmtIds; /** Timeout object. */ @GridToStringExclude private volatile GridTimeoutObject timeoutObj; -- -- /** Cache context. */ -- private final GridCacheSharedContext<?, ?> cctx; -- /** Busy lock to prevent activities from accessing exchanger while it's stopping. */ private ReadWriteLock busyLock; -- /** */ private AtomicBoolean added = new AtomicBoolean(false); -- /** Event latch. */ @GridToStringExclude private CountDownLatch evtLatch = new CountDownLatch(1); -- /** */ private GridFutureAdapter<Boolean> initFut; -- /** Topology snapshot. */ private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot = new AtomicReference<>(); -- /** Last committed cache version before next topology version use. */ private AtomicReference<GridCacheVersion> lastVer = new AtomicReference<>(); -- -- /** -- * Messages received on non-coordinator are stored in case if this node -- * becomes coordinator. -- */ -- private final Map<UUID, GridDhtPartitionsSingleMessage> singleMsgs = new ConcurrentHashMap8<>(); -- -- /** Messages received from new coordinator. */ -- private final Map<UUID, GridDhtPartitionsFullMessage> fullMsgs = new ConcurrentHashMap8<>(); -- /** */ @SuppressWarnings({"FieldCanBeLocal", "UnusedDeclaration"}) @GridToStringInclude private volatile IgniteInternalFuture<?> partReleaseFut; -- -- /** */ -- private final Object mux = new Object(); -- /** Logger. */ private IgniteLogger log; http://git-wip-us.apache.org/repos/asf/ignite/blob/d145cb70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 19b461e,74237f8..df1e10b --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@@ -69,28 -70,28 +70,20 @@@ import static org.apache.ignite.interna public class GridDhtPreloader extends GridCachePreloaderAdapter { /** Default preload resend timeout. */ public static final long DFLT_PRELOAD_RESEND_TIMEOUT = 1500; -- -- /** */ -- private GridDhtPartitionTopology top; -- /** Topology version. */ private final GridAtomicLong topVer = new GridAtomicLong(); -- /** Force key futures. */ private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap(); -- ++ /** Busy lock to prevent activities from accessing exchanger while it's stopping. */ ++ private final ReadWriteLock busyLock = new ReentrantReadWriteLock(); ++ /** */ ++ private GridDhtPartitionTopology top; /** Partition suppliers. */ private GridDhtPartitionSupplyPool supplyPool; -- /** Partition demanders. */ private GridDhtPartitionDemandPool demandPool; -- /** Start future. */ private GridFutureAdapter<Object> startFut; -- -- /** Busy lock to prevent activities from accessing exchanger while it's stopping. */ -- private final ReadWriteLock busyLock = new ReentrantReadWriteLock(); -- /** Pending affinity assignment futures. */ private ConcurrentMap<AffinityTopologyVersion, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts = new ConcurrentHashMap8<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/d145cb70/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index c93d5af,5ea2c02..0e87158 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@@ -281,47 -281,47 +281,125 @@@ public class TcpCommunicationSpi extend * {@code "Math.min(4, Runtime.getRuntime().availableProcessors())"}. */ public static final int DFLT_SELECTORS_CNT = Math.min(4, Runtime.getRuntime().availableProcessors()); -- -- /** Node ID meta for session. */ -- private static final int NODE_ID_META = GridNioSessionMetaKey.nextUniqueKey(); -- -- /** Message tracker meta for session. */ -- private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey(); -- /** * Default local port range (value is <tt>100</tt>). * See {@link #setLocalPortRange(int)} for details. */ public static final int DFLT_PORT_RANGE = 100; -- /** Default value for {@code TCP_NODELAY} socket option (value is <tt>true</tt>). */ public static final boolean DFLT_TCP_NODELAY = true; -- /** Default received messages threshold for sending ack. */ public static final int DFLT_ACK_SND_THRESHOLD = 16; -- /** Default socket write timeout. */ public static final long DFLT_SOCK_WRITE_TIMEOUT = 2000; -- ++ /** Node ID message type. */ ++ public static final byte NODE_ID_MSG_TYPE = -1; ++ /** */ ++ public static final byte RECOVERY_LAST_ID_MSG_TYPE = -2; ++ /** */ ++ public static final byte HANDSHAKE_MSG_TYPE = -3; ++ /** Node ID meta for session. */ ++ private static final int NODE_ID_META = GridNioSessionMetaKey.nextUniqueKey(); ++ /** Message tracker meta for session. */ ++ private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey(); /** No-op runnable. */ private static final IgniteRunnable NOOP = new IgniteRunnable() { @Override public void run() { // No-op. } }; ++ /** Shared memory workers. */ ++ private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque8<>(); ++ /** Clients. */ ++ private final ConcurrentMap<UUID, GridCommunicationClient> clients = GridConcurrentFactory.newMap(); ++ /** Received messages count. */ ++ private final LongAdder8 rcvdMsgsCnt = new LongAdder8(); ++ /** Sent messages count.*/ ++ private final LongAdder8 sentMsgsCnt = new LongAdder8(); ++ /** Received bytes count. */ ++ private final LongAdder8 rcvdBytesCnt = new LongAdder8(); ++ /** Sent bytes count.*/ ++ private final LongAdder8 sentBytesCnt = new LongAdder8(); ++ /** Context initialization latch. */ ++ private final CountDownLatch ctxInitLatch = new CountDownLatch(1); ++ /** metrics listener. */ ++ private final GridNioMetricsListener metricsLsnr = new GridNioMetricsListener() { ++ @Override public void onBytesSent(int bytesCnt) { ++ sentBytesCnt.add(bytesCnt); ++ } -- /** Node ID message type. */ -- public static final byte NODE_ID_MSG_TYPE = -1; -- -- /** */ -- public static final byte RECOVERY_LAST_ID_MSG_TYPE = -2; -- ++ @Override public void onBytesReceived(int bytesCnt) { ++ rcvdBytesCnt.add(bytesCnt); ++ } ++ }; ++ /** Client connect futures. */ ++ private final ConcurrentMap<UUID, GridFutureAdapter<GridCommunicationClient>> clientFuts = ++ GridConcurrentFactory.newMap(); /** */ -- public static final byte HANDSHAKE_MSG_TYPE = -3; -- ++ private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> recoveryDescs = GridConcurrentFactory.newMap(); /** */ private ConnectGateway connectGate; ++ /** Logger. */ ++ @LoggerResource ++ private IgniteLogger log; ++ /** Discovery listener. */ ++ private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { ++ @Override public void onEvent(Event evt) { ++ assert evt instanceof DiscoveryEvent : evt; ++ assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ; ++ onNodeLeft(((DiscoveryEvent)evt).eventNode().id()); ++ } ++ }; ++ /** Local IP address. */ ++ private String locAddr; ++ /** Complex variable that represents this node IP address. */ ++ private volatile InetAddress locHost; ++ /** Local port which node uses. */ ++ private int locPort = DFLT_PORT; ++ /** Local port range. */ ++ private int locPortRange = DFLT_PORT_RANGE; ++ /** Local port which node uses to accept shared memory connections. */ ++ private int shmemPort = DFLT_SHMEM_PORT; ++ /** Allocate direct buffer or heap buffer. */ ++ private boolean directBuf = true; ++ /** Allocate direct buffer or heap buffer. */ ++ private boolean directSndBuf; ++ /** Idle connection timeout. */ ++ private long idleConnTimeout = DFLT_IDLE_CONN_TIMEOUT; ++ /** Connect timeout. */ ++ private long connTimeout = DFLT_CONN_TIMEOUT; ++ /** Maximum connect timeout. */ ++ private long maxConnTimeout = DFLT_MAX_CONN_TIMEOUT; ++ /** Reconnect attempts count. */ ++ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) ++ private int reconCnt = DFLT_RECONNECT_CNT; ++ /** Socket send buffer. */ ++ private int sockSndBuf = DFLT_SOCK_BUF_SIZE; ++ /** Socket receive buffer. */ ++ private int sockRcvBuf = DFLT_SOCK_BUF_SIZE; ++ /** Message queue limit. */ ++ private int msgQueueLimit = DFLT_MSG_QUEUE_LIMIT; ++ /** Slow client queue limit. */ ++ private int slowClientQueueLimit; ++ /** NIO server. */ ++ private GridNioServer<Message> nioSrvr; ++ /** Shared memory server. */ ++ private IpcSharedMemoryServerEndpoint shmemSrv; ++ /** {@code TCP_NODELAY} option value for created sockets. */ ++ private boolean tcpNoDelay = DFLT_TCP_NODELAY; ++ /** Number of received messages after which acknowledgment is sent. */ ++ private int ackSndThreshold = DFLT_ACK_SND_THRESHOLD; ++ /** Maximum number of unacknowledged messages. */ ++ private int unackedMsgsBufSize; ++ /** Socket write timeout. */ ++ private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT; ++ /** Recovery and idle clients handler. */ ++ private CommunicationWorker commWorker; ++ /** Shared memory accept worker. */ ++ private ShmemAcceptWorker shmemAcceptWorker; ++ /** SPI listener. */ ++ private volatile CommunicationListener<Message> lsnr; /** Server listener. */ private final GridNioServerListener<Message> srvLsnr = new GridNioServerListenerAdapter<Message>() { @@@ -724,145 -724,145 +802,15 @@@ } } }; -- -- /** Logger. */ -- @LoggerResource -- private IgniteLogger log; -- -- /** Local IP address. */ -- private String locAddr; -- -- /** Complex variable that represents this node IP address. */ -- private volatile InetAddress locHost; -- -- /** Local port which node uses. */ -- private int locPort = DFLT_PORT; -- -- /** Local port range. */ -- private int locPortRange = DFLT_PORT_RANGE; -- -- /** Local port which node uses to accept shared memory connections. */ -- private int shmemPort = DFLT_SHMEM_PORT; -- -- /** Allocate direct buffer or heap buffer. */ -- private boolean directBuf = true; -- -- /** Allocate direct buffer or heap buffer. */ -- private boolean directSndBuf; -- -- /** Idle connection timeout. */ -- private long idleConnTimeout = DFLT_IDLE_CONN_TIMEOUT; -- -- /** Connect timeout. */ -- private long connTimeout = DFLT_CONN_TIMEOUT; -- -- /** Maximum connect timeout. */ -- private long maxConnTimeout = DFLT_MAX_CONN_TIMEOUT; -- -- /** Reconnect attempts count. */ -- @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) -- private int reconCnt = DFLT_RECONNECT_CNT; -- -- /** Socket send buffer. */ -- private int sockSndBuf = DFLT_SOCK_BUF_SIZE; -- -- /** Socket receive buffer. */ -- private int sockRcvBuf = DFLT_SOCK_BUF_SIZE; -- -- /** Message queue limit. */ -- private int msgQueueLimit = DFLT_MSG_QUEUE_LIMIT; -- -- /** Slow client queue limit. */ -- private int slowClientQueueLimit; -- -- /** NIO server. */ -- private GridNioServer<Message> nioSrvr; -- -- /** Shared memory server. */ -- private IpcSharedMemoryServerEndpoint shmemSrv; -- -- /** {@code TCP_NODELAY} option value for created sockets. */ -- private boolean tcpNoDelay = DFLT_TCP_NODELAY; -- -- /** Number of received messages after which acknowledgment is sent. */ -- private int ackSndThreshold = DFLT_ACK_SND_THRESHOLD; -- -- /** Maximum number of unacknowledged messages. */ -- private int unackedMsgsBufSize; -- -- /** Socket write timeout. */ -- private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT; -- -- /** Recovery and idle clients handler. */ -- private CommunicationWorker commWorker; -- -- /** Shared memory accept worker. */ -- private ShmemAcceptWorker shmemAcceptWorker; -- -- /** Shared memory workers. */ -- private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque8<>(); -- -- /** Clients. */ -- private final ConcurrentMap<UUID, GridCommunicationClient> clients = GridConcurrentFactory.newMap(); -- -- /** SPI listener. */ -- private volatile CommunicationListener<Message> lsnr; -- /** Bound port. */ private int boundTcpPort = -1; -- /** Bound port for shared memory server. */ private int boundTcpShmemPort = -1; -- /** Count of selectors to use in TCP server. */ private int selectorsCnt = DFLT_SELECTORS_CNT; -- /** Address resolver. */ private AddressResolver addrRslvr; -- /** Received messages count. */ -- private final LongAdder8 rcvdMsgsCnt = new LongAdder8(); -- -- /** Sent messages count.*/ -- private final LongAdder8 sentMsgsCnt = new LongAdder8(); -- -- /** Received bytes count. */ -- private final LongAdder8 rcvdBytesCnt = new LongAdder8(); -- -- /** Sent bytes count.*/ -- private final LongAdder8 sentBytesCnt = new LongAdder8(); -- -- /** Context initialization latch. */ -- private final CountDownLatch ctxInitLatch = new CountDownLatch(1); -- -- /** metrics listener. */ -- private final GridNioMetricsListener metricsLsnr = new GridNioMetricsListener() { -- @Override public void onBytesSent(int bytesCnt) { -- sentBytesCnt.add(bytesCnt); -- } -- -- @Override public void onBytesReceived(int bytesCnt) { -- rcvdBytesCnt.add(bytesCnt); -- } -- }; -- -- /** Client connect futures. */ -- private final ConcurrentMap<UUID, GridFutureAdapter<GridCommunicationClient>> clientFuts = -- GridConcurrentFactory.newMap(); -- -- /** */ -- private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> recoveryDescs = GridConcurrentFactory.newMap(); -- -- /** Discovery listener. */ -- private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { -- @Override public void onEvent(Event evt) { -- assert evt instanceof DiscoveryEvent : evt; -- assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ; -- -- onNodeLeft(((DiscoveryEvent)evt).eventNode().id()); -- } -- }; -- /** * @return {@code True} if ssl enabled. */ @@@ -897,6 -897,6 +845,11 @@@ } } ++ /** {@inheritDoc} */ ++ @Override public String getLocalAddress() { ++ return locAddr; ++ } ++ /** * Sets local host address for socket binding. Note that one node could have * additional addresses beside the loopback one. This configuration @@@ -913,8 -913,8 +866,8 @@@ } /** {@inheritDoc} */ -- @Override public String getLocalAddress() { -- return locAddr; ++ @Override public int getLocalPort() { ++ return locPort; } /** @@@ -930,8 -930,8 +883,8 @@@ } /** {@inheritDoc} */ -- @Override public int getLocalPort() { -- return locPort; ++ @Override public int getLocalPortRange() { ++ return locPortRange; } /** @@@ -956,8 -956,8 +909,8 @@@ } /** {@inheritDoc} */ -- @Override public int getLocalPortRange() { -- return locPortRange; ++ @Override public int getSharedMemoryPort() { ++ return shmemPort; } /** @@@ -975,8 -975,8 +928,8 @@@ } /** {@inheritDoc} */ -- @Override public int getSharedMemoryPort() { -- return shmemPort; ++ @Override public long getIdleConnectionTimeout() { ++ return idleConnTimeout; } /** @@@ -993,11 -993,11 +946,6 @@@ } /** {@inheritDoc} */ -- @Override public long getIdleConnectionTimeout() { -- return idleConnTimeout; -- } -- -- /** {@inheritDoc} */ @Override public long getSocketWriteTimeout() { return sockWriteTimeout; } @@@ -1049,6 -1049,6 +997,12 @@@ this.unackedMsgsBufSize = unackedMsgsBufSize; } ++ /** {@inheritDoc} */ ++ @Deprecated ++ @Override public int getConnectionBufferSize() { ++ return 0; ++ } ++ /** * Sets connection buffer size. If set to {@code 0} connection buffer is disabled. * @@@ -1064,7 -1064,7 +1018,7 @@@ /** {@inheritDoc} */ @Deprecated -- @Override public int getConnectionBufferSize() { ++ @Override public long getConnectionBufferFlushFrequency() { return 0; } @@@ -1076,9 -1076,9 +1030,8 @@@ } /** {@inheritDoc} */ -- @Deprecated -- @Override public long getConnectionBufferFlushFrequency() { -- return 0; ++ @Override public long getConnectTimeout() { ++ return connTimeout; } /** @@@ -1101,8 -1101,8 +1054,8 @@@ } /** {@inheritDoc} */ -- @Override public long getConnectTimeout() { -- return connTimeout; ++ @Override public long getMaxConnectTimeout() { ++ return maxConnTimeout; } /** @@@ -1127,8 -1127,8 +1080,8 @@@ } /** {@inheritDoc} */ -- @Override public long getMaxConnectTimeout() { -- return maxConnTimeout; ++ @Override public int getReconnectCount() { ++ return reconCnt; } /** @@@ -1149,8 -1149,8 +1102,8 @@@ } /** {@inheritDoc} */ -- @Override public int getReconnectCount() { -- return reconCnt; ++ @Override public boolean isDirectBuffer() { ++ return directBuf; } /** @@@ -1168,11 -1168,11 +1121,6 @@@ } /** {@inheritDoc} */ -- @Override public boolean isDirectBuffer() { -- return directBuf; -- } -- -- /** {@inheritDoc} */ @Override public boolean isDirectSendBuffer() { return directSndBuf; } @@@ -1189,6 -1189,6 +1137,11 @@@ this.directSndBuf = directSndBuf; } ++ /** {@inheritDoc} */ ++ @Override public int getSelectorsCount() { ++ return selectorsCnt; ++ } ++ /** * Sets the count of selectors te be used in TCP server. * <p/> @@@ -1202,8 -1202,8 +1155,8 @@@ } /** {@inheritDoc} */ -- @Override public int getSelectorsCount() { -- return selectorsCnt; ++ @Override public boolean isTcpNoDelay() { ++ return tcpNoDelay; } /** @@@ -1226,8 -1226,8 +1179,8 @@@ } /** {@inheritDoc} */ -- @Override public boolean isTcpNoDelay() { -- return tcpNoDelay; ++ @Override public int getSocketReceiveBuffer() { ++ return sockRcvBuf; } /** @@@ -1243,8 -1243,8 +1196,8 @@@ } /** {@inheritDoc} */ -- @Override public int getSocketReceiveBuffer() { -- return sockRcvBuf; ++ @Override public int getSocketSendBuffer() { ++ return sockSndBuf; } /** @@@ -1260,8 -1260,8 +1213,8 @@@ } /** {@inheritDoc} */ -- @Override public int getSocketSendBuffer() { -- return sockSndBuf; ++ @Override public int getMessageQueueLimit() { ++ return msgQueueLimit; } /** @@@ -1280,11 -1280,11 +1233,6 @@@ } /** {@inheritDoc} */ -- @Override public int getMessageQueueLimit() { -- return msgQueueLimit; -- } -- -- /** {@inheritDoc} */ @Override public int getSlowClientQueueLimit() { return slowClientQueueLimit; } @@@ -1305,6 -1305,6 +1253,12 @@@ this.slowClientQueueLimit = slowClientQueueLimit; } ++ /** {@inheritDoc} */ ++ @Deprecated ++ @Override public int getMinimumBufferedMessageCount() { ++ return 0; ++ } ++ /** * Sets the minimum number of messages for this SPI, that are buffered * prior to sending. @@@ -1318,17 -1318,17 +1272,6 @@@ // No-op. } -- /** {@inheritDoc} */ -- @Deprecated -- @Override public int getMinimumBufferedMessageCount() { -- return 0; -- } -- -- /** {@inheritDoc} */ -- @Override public void setListener(CommunicationListener<Message> lsnr) { -- this.lsnr = lsnr; -- } -- /** * @return Listener. */ @@@ -1337,6 -1337,6 +1280,11 @@@ } /** {@inheritDoc} */ ++ @Override public void setListener(CommunicationListener<Message> lsnr) { ++ this.lsnr = lsnr; ++ } ++ ++ /** {@inheritDoc} */ @Override public int getSentMessagesCount() { return sentMsgsCnt.intValue(); } @@@ -2832,732 -2844,732 +2792,732 @@@ } /** -- * This worker takes responsibility to shut the server down when stopping, -- * No other thread shall stop passed server. ++ * */ -- private class ShmemAcceptWorker extends GridWorker { ++ private static class ConnectFuture extends GridFutureAdapter<GridCommunicationClient> { /** */ -- private final IpcSharedMemoryServerEndpoint srv; ++ private static final long serialVersionUID = 0L; ++ ++ // No-op. ++ } ++ ++ /** ++ * ++ */ ++ private static class HandshakeTimeoutObject<T> implements IgniteSpiTimeoutObject { ++ /** */ ++ private final IgniteUuid id = IgniteUuid.randomUuid(); ++ ++ /** */ ++ private final T obj; ++ ++ /** */ ++ private final long endTime; ++ ++ /** */ ++ private final AtomicBoolean done = new AtomicBoolean(); /** -- * @param srv Server. ++ * @param obj Client. ++ * @param endTime End time. */ -- ShmemAcceptWorker(IpcSharedMemoryServerEndpoint srv) { -- super(gridName, "shmem-communication-acceptor", TcpCommunicationSpi.this.log); ++ private HandshakeTimeoutObject(T obj, long endTime) { ++ assert obj != null; ++ assert obj instanceof GridCommunicationClient || obj instanceof SelectableChannel; ++ assert endTime > 0; -- this.srv = srv; ++ this.obj = obj; ++ this.endTime = endTime; } -- /** {@inheritDoc} */ -- @Override protected void body() throws InterruptedException { -- try { -- while (!Thread.interrupted()) { -- ShmemWorker e = new ShmemWorker(srv.accept()); -- -- shmemWorkers.add(e); ++ /** ++ * @return {@code True} if object has not yet been timed out. ++ */ ++ boolean cancel() { ++ return done.compareAndSet(false, true); ++ } -- new IgniteThread(e).start(); -- } -- } -- catch (IgniteCheckedException e) { -- if (!isCancelled()) -- U.error(log, "Shmem server failed.", e); -- } -- finally { -- srv.close(); ++ /** {@inheritDoc} */ ++ @Override public void onTimeout() { ++ if (done.compareAndSet(false, true)) { ++ // Close socket - timeout occurred. ++ if (obj instanceof GridCommunicationClient) ++ ((GridCommunicationClient)obj).forceClose(); ++ else ++ U.closeQuiet((AbstractInterruptibleChannel)obj); } } /** {@inheritDoc} */ -- @Override public void cancel() { -- super.cancel(); ++ @Override public long endTime() { ++ return endTime; ++ } -- srv.close(); ++ /** {@inheritDoc} */ ++ @Override public IgniteUuid id() { ++ return id; ++ } ++ ++ /** {@inheritDoc} */ ++ @Override public String toString() { ++ return S.toString(HandshakeTimeoutObject.class, this); } } /** -- * ++ * Handshake message. */ -- private class ShmemWorker extends GridWorker { ++ @SuppressWarnings("PublicInnerClass") ++ public static class HandshakeMessage implements Message { /** */ -- private final IpcEndpoint endpoint; ++ private static final long serialVersionUID = 0L; ++ ++ /** */ ++ private UUID nodeId; ++ ++ /** */ ++ private long rcvCnt; ++ ++ /** */ ++ private long connectCnt; /** -- * @param endpoint Endpoint. ++ * Default constructor required by {@link Message}. */ -- private ShmemWorker(IpcEndpoint endpoint) { -- super(gridName, "shmem-worker", TcpCommunicationSpi.this.log); -- -- this.endpoint = endpoint; ++ public HandshakeMessage() { ++ // No-op. } -- /** {@inheritDoc} */ -- @Override protected void body() throws InterruptedException { -- try { -- MessageFactory msgFactory = new MessageFactory() { -- private MessageFactory impl; ++ /** ++ * @param nodeId Node ID. ++ * @param connectCnt Connect count. ++ * @param rcvCnt Number of received messages. ++ */ ++ public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) { ++ assert nodeId != null; ++ assert rcvCnt >= 0 : rcvCnt; -- @Nullable @Override public Message create(byte type) { -- if (impl == null) -- impl = getSpiContext().messageFactory(); ++ this.nodeId = nodeId; ++ this.connectCnt = connectCnt; ++ this.rcvCnt = rcvCnt; ++ } -- assert impl != null; ++ /** ++ * @return Connect count. ++ */ ++ public long connectCount() { ++ return connectCnt; ++ } -- return impl.create(type); -- } -- }; ++ /** ++ * @return Number of received messages. ++ */ ++ public long received() { ++ return rcvCnt; ++ } -- MessageFormatter msgFormatter = new MessageFormatter() { -- private MessageFormatter impl; ++ /** ++ * @return Node ID. ++ */ ++ public UUID nodeId() { ++ return nodeId; ++ } -- @Override public MessageWriter writer() { -- if (impl == null) -- impl = getSpiContext().messageFormatter(); - - assert impl != null; ++ /** {@inheritDoc} */ ++ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { ++ if (buf.remaining() < 33) ++ return false; - return impl.writer(); - } - assert impl != null; ++ buf.put(HANDSHAKE_MSG_TYPE); - @Override public MessageReader reader(MessageFactory factory, Class<? extends Message> msgCls) { - if (impl == null) - impl = getSpiContext().messageFormatter(); - return impl.writer(); - } ++ byte[] bytes = U.uuidToBytes(nodeId); - @Override public MessageReader reader(MessageFactory factory, Class<? extends Message> msgCls) { - if (impl == null) - impl = getSpiContext().messageFormatter(); - -- assert impl != null; ++ assert bytes.length == 16 : bytes.length; -- return impl.reader(factory, msgCls); -- } -- }; ++ buf.put(bytes); -- IpcToNioAdapter<Message> adapter = new IpcToNioAdapter<>( -- metricsLsnr, -- log, -- endpoint, -- srvLsnr, -- msgFormatter, -- new GridNioCodecFilter(new GridDirectParser(msgFactory, msgFormatter), log, true), -- new GridConnectionBytesVerifyFilter(log) -- ); ++ buf.putLong(rcvCnt); -- adapter.serve(); -- } -- finally { -- shmemWorkers.remove(this); ++ buf.putLong(connectCnt); -- endpoint.close(); -- } ++ return true; } /** {@inheritDoc} */ -- @Override public void cancel() { -- super.cancel(); ++ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { ++ if (buf.remaining() < 32) ++ return false; -- endpoint.close(); ++ byte[] nodeIdBytes = new byte[16]; ++ ++ buf.get(nodeIdBytes); ++ ++ nodeId = U.bytesToUuid(nodeIdBytes, 0); ++ ++ rcvCnt = buf.getLong(); ++ ++ connectCnt = buf.getLong(); ++ ++ return true; } -- /** @{@inheritDoc} */ -- @Override protected void cleanup() { -- super.cleanup(); ++ /** {@inheritDoc} */ ++ @Override public byte directType() { ++ return HANDSHAKE_MSG_TYPE; ++ } -- endpoint.close(); ++ /** {@inheritDoc} */ ++ @Override public byte fieldsCount() { ++ throw new UnsupportedOperationException(); } -- /** @{@inheritDoc} */ ++ /** {@inheritDoc} */ @Override public String toString() { -- return S.toString(ShmemWorker.class, this); ++ return S.toString(HandshakeMessage.class, this); } } /** -- * ++ * Recovery acknowledgment message. */ -- private class CommunicationWorker extends IgniteSpiThread { ++ @SuppressWarnings("PublicInnerClass") ++ public static class RecoveryLastReceivedMessage implements Message { /** */ -- private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>(); ++ private static final long serialVersionUID = 0L; ++ ++ /** */ ++ private long rcvCnt; /** -- * ++ * Default constructor required by {@link Message}. */ -- private CommunicationWorker() { -- super(gridName, "tcp-comm-worker", log); ++ public RecoveryLastReceivedMessage() { ++ // No-op. } -- /** {@inheritDoc} */ -- @Override protected void body() throws InterruptedException { -- if (log.isDebugEnabled()) -- log.debug("Tcp communication worker has been started."); -- -- while (!isInterrupted()) { -- GridNioRecoveryDescriptor recoveryDesc = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS); -- -- if (recoveryDesc != null) -- processRecovery(recoveryDesc); -- else -- processIdle(); -- } ++ /** ++ * @param rcvCnt Number of received messages. ++ */ ++ public RecoveryLastReceivedMessage(long rcvCnt) { ++ this.rcvCnt = rcvCnt; } /** -- * ++ * @return Number of received messages. */ -- private void processIdle() { -- cleanupRecovery(); ++ public long received() { ++ return rcvCnt; ++ } -- for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) { -- UUID nodeId = e.getKey(); ++ /** {@inheritDoc} */ ++ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { ++ if (buf.remaining() < 9) ++ return false; -- GridCommunicationClient client = e.getValue(); ++ buf.put(RECOVERY_LAST_ID_MSG_TYPE); -- ClusterNode node = getSpiContext().node(nodeId); ++ buf.putLong(rcvCnt); -- if (node == null) { -- if (log.isDebugEnabled()) -- log.debug("Forcing close of non-existent node connection: " + nodeId); ++ return true; ++ } -- client.forceClose(); ++ /** {@inheritDoc} */ ++ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { ++ if (buf.remaining() < 8) ++ return false; -- clients.remove(nodeId, client); ++ rcvCnt = buf.getLong(); -- continue; -- } ++ return true; ++ } -- GridNioRecoveryDescriptor recovery = null; ++ /** {@inheritDoc} */ ++ @Override public byte directType() { ++ return RECOVERY_LAST_ID_MSG_TYPE; ++ } -- if (client instanceof GridTcpNioCommunicationClient) { -- recovery = recoveryDescs.get(new ClientKey(node.id(), node.order())); ++ /** {@inheritDoc} */ ++ @Override public byte fieldsCount() { ++ return 0; ++ } -- if (recovery != null && recovery.lastAcknowledged() != recovery.received()) { -- RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received()); ++ /** {@inheritDoc} */ ++ @Override public String toString() { ++ return S.toString(RecoveryLastReceivedMessage.class, this); ++ } ++ } -- if (log.isDebugEnabled()) -- log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId + -- ", rcvCnt=" + msg.received() + ']'); ++ /** ++ * Node ID message. ++ */ ++ @SuppressWarnings("PublicInnerClass") ++ public static class NodeIdMessage implements Message { ++ /** */ ++ private static final long serialVersionUID = 0L; -- nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg); ++ /** */ ++ private byte[] nodeIdBytes; -- recovery.lastAcknowledged(msg.received()); ++ /** */ ++ private byte[] nodeIdBytesWithType; -- continue; -- } -- } ++ /** */ ++ public NodeIdMessage() { ++ // No-op. ++ } -- long idleTime = client.getIdleTime(); ++ /** ++ * @param nodeId Node ID. ++ */ ++ private NodeIdMessage(UUID nodeId) { ++ assert nodeId != null; -- if (idleTime >= idleConnTimeout) { -- if (recovery != null && -- recovery.nodeAlive(getSpiContext().node(nodeId)) && -- !recovery.messagesFutures().isEmpty()) { -- if (log.isDebugEnabled()) -- log.debug("Node connection is idle, but there are unacknowledged messages, " + -- "will wait: " + nodeId); ++ nodeIdBytes = U.uuidToBytes(nodeId); -- continue; -- } ++ nodeIdBytesWithType = new byte[nodeIdBytes.length + 1]; -- if (log.isDebugEnabled()) -- log.debug("Closing idle node connection: " + nodeId); ++ nodeIdBytesWithType[0] = NODE_ID_MSG_TYPE; -- if (client.close() || client.closed()) -- clients.remove(nodeId, client); -- } -- } ++ System.arraycopy(nodeIdBytes, 0, nodeIdBytesWithType, 1, nodeIdBytes.length); } -- /** -- * -- */ -- private void cleanupRecovery() { -- Set<ClientKey> left = null; ++ /** {@inheritDoc} */ ++ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { ++ assert nodeIdBytes.length == 16; -- for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> e : recoveryDescs.entrySet()) { -- if (left != null && left.contains(e.getKey())) -- continue; ++ if (buf.remaining() < 17) ++ return false; -- GridNioRecoveryDescriptor recoverySnd = e.getValue(); ++ buf.put(NODE_ID_MSG_TYPE); ++ buf.put(nodeIdBytes); -- if (!recoverySnd.nodeAlive(getSpiContext().node(recoverySnd.node().id()))) { -- if (left == null) -- left = new HashSet<>(); ++ return true; ++ } -- left.add(e.getKey()); -- } -- } ++ /** {@inheritDoc} */ ++ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { ++ if (buf.remaining() < 16) ++ return false; -- if (left != null) { -- assert !left.isEmpty(); ++ nodeIdBytes = new byte[16]; -- for (ClientKey id : left) { -- GridNioRecoveryDescriptor recoverySnd = recoveryDescs.remove(id); ++ buf.get(nodeIdBytes); -- if (recoverySnd != null) -- recoverySnd.onNodeLeft(); -- } -- } ++ return true; } -- /** -- * @param recoveryDesc Recovery descriptor. -- */ -- private void processRecovery(GridNioRecoveryDescriptor recoveryDesc) { -- ClusterNode node = recoveryDesc.node(); ++ /** {@inheritDoc} */ ++ @Override public byte directType() { ++ return NODE_ID_MSG_TYPE; ++ } -- try { -- if (clients.containsKey(node.id()) || -- !recoveryDesc.nodeAlive(getSpiContext().node(node.id())) || -- !getSpiContext().pingNode(node.id())) -- return; -- } -- catch (IgniteClientDisconnectedException e) { -- if (log.isDebugEnabled()) -- log.debug("Failed to ping node, client disconnected."); ++ /** {@inheritDoc} */ ++ @Override public byte fieldsCount() { ++ return 0; ++ } -- return; -- } ++ /** {@inheritDoc} */ ++ @Override public String toString() { ++ return S.toString(NodeIdMessage.class, this); ++ } ++ } -- try { -- if (log.isDebugEnabled()) -- log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']'); ++ /** ++ * This worker takes responsibility to shut the server down when stopping, ++ * No other thread shall stop passed server. ++ */ ++ private class ShmemAcceptWorker extends GridWorker { ++ /** */ ++ private final IpcSharedMemoryServerEndpoint srv; -- GridCommunicationClient client = reserveClient(node); ++ /** ++ * @param srv Server. ++ */ ++ ShmemAcceptWorker(IpcSharedMemoryServerEndpoint srv) { ++ super(gridName, "shmem-communication-acceptor", TcpCommunicationSpi.this.log); -- client.release(); -- } -- catch (IgniteCheckedException | IgniteException e) { -- if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) { -- if (log.isDebugEnabled()) -- log.debug("Recovery reconnect failed, will retry " + -- "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); ++ this.srv = srv; ++ } -- addReconnectRequest(recoveryDesc); -- } -- else { -- if (log.isDebugEnabled()) -- log.debug("Recovery reconnect failed, " + -- "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); ++ /** {@inheritDoc} */ ++ @Override protected void body() throws InterruptedException { ++ try { ++ while (!Thread.interrupted()) { ++ ShmemWorker e = new ShmemWorker(srv.accept()); -- onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]", -- e); ++ shmemWorkers.add(e); ++ ++ new IgniteThread(e).start(); } } ++ catch (IgniteCheckedException e) { ++ if (!isCancelled()) ++ U.error(log, "Shmem server failed.", e); ++ } ++ finally { ++ srv.close(); ++ } } -- /** -- * @param recoverySnd Recovery send data. -- */ -- void addReconnectRequest(GridNioRecoveryDescriptor recoverySnd) { -- boolean add = q.add(recoverySnd); ++ /** {@inheritDoc} */ ++ @Override public void cancel() { ++ super.cancel(); -- assert add; ++ srv.close(); } } /** * */ -- private static class ConnectFuture extends GridFutureAdapter<GridCommunicationClient> { ++ private class ShmemWorker extends GridWorker { /** */ -- private static final long serialVersionUID = 0L; ++ private final IpcEndpoint endpoint; -- // No-op. -- } ++ /** ++ * @param endpoint Endpoint. ++ */ ++ private ShmemWorker(IpcEndpoint endpoint) { ++ super(gridName, "shmem-worker", TcpCommunicationSpi.this.log); -- /** -- * -- */ -- private static class HandshakeTimeoutObject<T> implements IgniteSpiTimeoutObject { -- /** */ -- private final IgniteUuid id = IgniteUuid.randomUuid(); ++ this.endpoint = endpoint; ++ } -- /** */ -- private final T obj; ++ /** {@inheritDoc} */ ++ @Override protected void body() throws InterruptedException { ++ try { ++ MessageFactory msgFactory = new MessageFactory() { ++ private MessageFactory impl; -- /** */ -- private final long endTime; ++ @Nullable @Override public Message create(byte type) { ++ if (impl == null) ++ impl = getSpiContext().messageFactory(); -- /** */ -- private final AtomicBoolean done = new AtomicBoolean(); ++ assert impl != null; -- /** -- * @param obj Client. -- * @param endTime End time. -- */ -- private HandshakeTimeoutObject(T obj, long endTime) { -- assert obj != null; -- assert obj instanceof GridCommunicationClient || obj instanceof SelectableChannel; -- assert endTime > 0; ++ return impl.create(type); ++ } ++ }; -- this.obj = obj; -- this.endTime = endTime; -- } ++ MessageFormatter msgFormatter = new MessageFormatter() { ++ private MessageFormatter impl; -- /** -- * @return {@code True} if object has not yet been timed out. -- */ -- boolean cancel() { -- return done.compareAndSet(false, true); -- } ++ @Override public MessageWriter writer() { ++ if (impl == null) ++ impl = getSpiContext().messageFormatter(); -- /** {@inheritDoc} */ -- @Override public void onTimeout() { -- if (done.compareAndSet(false, true)) { -- // Close socket - timeout occurred. -- if (obj instanceof GridCommunicationClient) -- ((GridCommunicationClient)obj).forceClose(); -- else -- U.closeQuiet((AbstractInterruptibleChannel)obj); ++ assert impl != null; ++ ++ return impl.writer(); ++ } ++ ++ @Override public MessageReader reader(MessageFactory factory, Class<? extends Message> msgCls) { ++ if (impl == null) ++ impl = getSpiContext().messageFormatter(); ++ ++ assert impl != null; ++ ++ return impl.reader(factory, msgCls); ++ } ++ }; ++ ++ IpcToNioAdapter<Message> adapter = new IpcToNioAdapter<>( ++ metricsLsnr, ++ log, ++ endpoint, ++ srvLsnr, ++ msgFormatter, ++ new GridNioCodecFilter(new GridDirectParser(msgFactory, msgFormatter), log, true), ++ new GridConnectionBytesVerifyFilter(log) ++ ); ++ ++ adapter.serve(); ++ } ++ finally { ++ shmemWorkers.remove(this); ++ ++ endpoint.close(); } } /** {@inheritDoc} */ -- @Override public long endTime() { -- return endTime; ++ @Override public void cancel() { ++ super.cancel(); ++ ++ endpoint.close(); } -- /** {@inheritDoc} */ -- @Override public IgniteUuid id() { -- return id; ++ /** @{@inheritDoc} */ ++ @Override protected void cleanup() { ++ super.cleanup(); ++ ++ endpoint.close(); } -- /** {@inheritDoc} */ ++ /** @{@inheritDoc} */ @Override public String toString() { -- return S.toString(HandshakeTimeoutObject.class, this); ++ return S.toString(ShmemWorker.class, this); } } /** * */ -- private class HandshakeClosure extends IgniteInClosure2X<InputStream, OutputStream> { -- /** */ -- private static final long serialVersionUID = 0L; -- ++ private class CommunicationWorker extends IgniteSpiThread { /** */ -- private final UUID rmtNodeId; ++ private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>(); /** -- * @param rmtNodeId Remote node ID. ++ * */ -- private HandshakeClosure(UUID rmtNodeId) { -- this.rmtNodeId = rmtNodeId; ++ private CommunicationWorker() { ++ super(gridName, "tcp-comm-worker", log); } /** {@inheritDoc} */ -- @SuppressWarnings("ThrowFromFinallyBlock") -- @Override public void applyx(InputStream in, OutputStream out) throws IgniteCheckedException { -- try { -- // Handshake. -- byte[] b = new byte[17]; ++ @Override protected void body() throws InterruptedException { ++ if (log.isDebugEnabled()) ++ log.debug("Tcp communication worker has been started."); -- int n = 0; ++ while (!isInterrupted()) { ++ GridNioRecoveryDescriptor recoveryDesc = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS); -- while (n < 17) { -- int cnt = in.read(b, n, 17 - n); ++ if (recoveryDesc != null) ++ processRecovery(recoveryDesc); ++ else ++ processIdle(); ++ } ++ } -- if (cnt < 0) -- throw new IgniteCheckedException("Failed to get remote node ID (end of stream reached)"); ++ /** ++ * ++ */ ++ private void processIdle() { ++ cleanupRecovery(); -- n += cnt; -- } ++ for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) { ++ UUID nodeId = e.getKey(); -- // First 4 bytes are for length. -- UUID id = U.bytesToUuid(b, 1); ++ GridCommunicationClient client = e.getValue(); -- if (!rmtNodeId.equals(id)) -- throw new IgniteCheckedException("Remote node ID is not as expected [expected=" + rmtNodeId + -- ", rcvd=" + id + ']'); -- else if (log.isDebugEnabled()) -- log.debug("Received remote node ID: " + id); -- } -- catch (SocketTimeoutException e) { -- throw new IgniteCheckedException("Failed to perform handshake due to timeout (consider increasing " + -- "'connectionTimeout' configuration property).", e); -- } -- catch (IOException e) { -- throw new IgniteCheckedException("Failed to perform handshake.", e); -- } ++ ClusterNode node = getSpiContext().node(nodeId); -- try { -- ClusterNode localNode = getLocalNode(); ++ if (node == null) { ++ if (log.isDebugEnabled()) ++ log.debug("Forcing close of non-existent node connection: " + nodeId); -- if (localNode == null) -- throw new IgniteSpiException("Local node has not been started or fully initialized " + -- "[isStopping=" + getSpiContext().isStopping() + ']'); ++ client.forceClose(); -- UUID id = localNode.id(); ++ clients.remove(nodeId, client); -- NodeIdMessage msg = new NodeIdMessage(id); ++ continue; ++ } -- out.write(U.IGNITE_HEADER); -- out.write(NODE_ID_MSG_TYPE); -- out.write(msg.nodeIdBytes); ++ GridNioRecoveryDescriptor recovery = null; -- out.flush(); ++ if (client instanceof GridTcpNioCommunicationClient) { ++ recovery = recoveryDescs.get(new ClientKey(node.id(), node.order())); -- if (log.isDebugEnabled()) -- log.debug("Sent local node ID [locNodeId=" + id + ", rmtNodeId=" + rmtNodeId + ']'); -- } -- catch (IOException e) { -- throw new IgniteCheckedException("Failed to perform handshake.", e); -- } -- } -- } ++ if (recovery != null && recovery.lastAcknowledged() != recovery.received()) { ++ RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received()); -- /** -- * Handshake message. -- */ -- @SuppressWarnings("PublicInnerClass") -- public static class HandshakeMessage implements Message { -- /** */ -- private static final long serialVersionUID = 0L; ++ if (log.isDebugEnabled()) ++ log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId + ++ ", rcvCnt=" + msg.received() + ']'); -- /** */ -- private UUID nodeId; ++ nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg); -- /** */ -- private long rcvCnt; ++ recovery.lastAcknowledged(msg.received()); -- /** */ -- private long connectCnt; ++ continue; ++ } ++ } -- /** -- * Default constructor required by {@link Message}. -- */ -- public HandshakeMessage() { -- // No-op. -- } ++ long idleTime = client.getIdleTime(); -- /** -- * @param nodeId Node ID. -- * @param connectCnt Connect count. -- * @param rcvCnt Number of received messages. -- */ -- public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) { -- assert nodeId != null; -- assert rcvCnt >= 0 : rcvCnt; ++ if (idleTime >= idleConnTimeout) { ++ if (recovery != null && ++ recovery.nodeAlive(getSpiContext().node(nodeId)) && ++ !recovery.messagesFutures().isEmpty()) { ++ if (log.isDebugEnabled()) ++ log.debug("Node connection is idle, but there are unacknowledged messages, " + ++ "will wait: " + nodeId); -- this.nodeId = nodeId; -- this.connectCnt = connectCnt; -- this.rcvCnt = rcvCnt; -- } ++ continue; ++ } -- /** -- * @return Connect count. -- */ -- public long connectCount() { -- return connectCnt; -- } ++ if (log.isDebugEnabled()) ++ log.debug("Closing idle node connection: " + nodeId); -- /** -- * @return Number of received messages. -- */ -- public long received() { -- return rcvCnt; ++ if (client.close() || client.closed()) ++ clients.remove(nodeId, client); ++ } ++ } } /** -- * @return Node ID. ++ * */ -- public UUID nodeId() { -- return nodeId; -- } -- -- /** {@inheritDoc} */ -- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { -- if (buf.remaining() < 33) -- return false; -- -- buf.put(HANDSHAKE_MSG_TYPE); -- -- byte[] bytes = U.uuidToBytes(nodeId); -- -- assert bytes.length == 16 : bytes.length; -- -- buf.put(bytes); -- -- buf.putLong(rcvCnt); -- -- buf.putLong(connectCnt); -- -- return true; -- } -- -- /** {@inheritDoc} */ -- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { -- if (buf.remaining() < 32) -- return false; -- -- byte[] nodeIdBytes = new byte[16]; -- -- buf.get(nodeIdBytes); -- -- nodeId = U.bytesToUuid(nodeIdBytes, 0); -- -- rcvCnt = buf.getLong(); -- -- connectCnt = buf.getLong(); -- -- return true; -- } ++ private void cleanupRecovery() { ++ Set<ClientKey> left = null; -- /** {@inheritDoc} */ -- @Override public byte directType() { -- return HANDSHAKE_MSG_TYPE; -- } ++ for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> e : recoveryDescs.entrySet()) { ++ if (left != null && left.contains(e.getKey())) ++ continue; -- /** {@inheritDoc} */ -- @Override public byte fieldsCount() { -- throw new UnsupportedOperationException(); -- } ++ GridNioRecoveryDescriptor recoverySnd = e.getValue(); -- /** {@inheritDoc} */ -- @Override public String toString() { -- return S.toString(HandshakeMessage.class, this); -- } -- } ++ if (!recoverySnd.nodeAlive(getSpiContext().node(recoverySnd.node().id()))) { ++ if (left == null) ++ left = new HashSet<>(); -- /** -- * Recovery acknowledgment message. -- */ -- @SuppressWarnings("PublicInnerClass") -- public static class RecoveryLastReceivedMessage implements Message { -- /** */ -- private static final long serialVersionUID = 0L; ++ left.add(e.getKey()); ++ } ++ } -- /** */ -- private long rcvCnt; ++ if (left != null) { ++ assert !left.isEmpty(); -- /** -- * Default constructor required by {@link Message}. -- */ -- public RecoveryLastReceivedMessage() { -- // No-op. -- } ++ for (ClientKey id : left) { ++ GridNioRecoveryDescriptor recoverySnd = recoveryDescs.remove(id); -- /** -- * @param rcvCnt Number of received messages. -- */ -- public RecoveryLastReceivedMessage(long rcvCnt) { -- this.rcvCnt = rcvCnt; ++ if (recoverySnd != null) ++ recoverySnd.onNodeLeft(); ++ } ++ } } /** -- * @return Number of received messages. ++ * @param recoveryDesc Recovery descriptor. */ -- public long received() { -- return rcvCnt; -- } -- -- /** {@inheritDoc} */ -- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { -- if (buf.remaining() < 9) -- return false; ++ private void processRecovery(GridNioRecoveryDescriptor recoveryDesc) { ++ ClusterNode node = recoveryDesc.node(); -- buf.put(RECOVERY_LAST_ID_MSG_TYPE); ++ try { ++ if (clients.containsKey(node.id()) || ++ !recoveryDesc.nodeAlive(getSpiContext().node(node.id())) || ++ !getSpiContext().pingNode(node.id())) ++ return; ++ } ++ catch (IgniteClientDisconnectedException e) { ++ if (log.isDebugEnabled()) ++ log.debug("Failed to ping node, client disconnected."); -- buf.putLong(rcvCnt); ++ return; ++ } -- return true; -- } ++ try { ++ if (log.isDebugEnabled()) ++ log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']'); -- /** {@inheritDoc} */ -- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { -- if (buf.remaining() < 8) -- return false; ++ GridCommunicationClient client = reserveClient(node); -- rcvCnt = buf.getLong(); ++ client.release(); ++ } ++ catch (IgniteCheckedException | IgniteException e) { ++ if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) { ++ if (log.isDebugEnabled()) ++ log.debug("Recovery reconnect failed, will retry " + ++ "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); -- return true; -- } ++ addReconnectRequest(recoveryDesc); ++ } ++ else { ++ if (log.isDebugEnabled()) ++ log.debug("Recovery reconnect failed, " + ++ "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); -- /** {@inheritDoc} */ -- @Override public byte directType() { -- return RECOVERY_LAST_ID_MSG_TYPE; ++ onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]", ++ e); ++ } ++ } } -- /** {@inheritDoc} */ -- @Override public byte fieldsCount() { -- return 0; -- } ++ /** ++ * @param recoverySnd Recovery send data. ++ */ ++ void addReconnectRequest(GridNioRecoveryDescriptor recoverySnd) { ++ boolean add = q.add(recoverySnd); -- /** {@inheritDoc} */ -- @Override public String toString() { -- return S.toString(RecoveryLastReceivedMessage.class, this); ++ assert add; } } /** -- * Node ID message. ++ * */ -- @SuppressWarnings("PublicInnerClass") -- public static class NodeIdMessage implements Message { ++ private class HandshakeClosure extends IgniteInClosure2X<InputStream, OutputStream> { /** */ private static final long serialVersionUID = 0L; /** */ -- private byte[] nodeIdBytes; -- -- /** */ -- private byte[] nodeIdBytesWithType; -- -- /** */ -- public NodeIdMessage() { -- // No-op. -- } ++ private final UUID rmtNodeId; /** -- * @param nodeId Node ID. ++ * @param rmtNodeId Remote node ID. */ -- private NodeIdMessage(UUID nodeId) { -- assert nodeId != null; -- -- nodeIdBytes = U.uuidToBytes(nodeId); ++ private HandshakeClosure(UUID rmtNodeId) { ++ this.rmtNodeId = rmtNodeId; ++ } -- nodeIdBytesWithType = new byte[nodeIdBytes.length + 1]; ++ /** {@inheritDoc} */ ++ @SuppressWarnings("ThrowFromFinallyBlock") ++ @Override public void applyx(InputStream in, OutputStream out) throws IgniteCheckedException { ++ try { ++ // Handshake. ++ byte[] b = new byte[17]; -- nodeIdBytesWithType[0] = NODE_ID_MSG_TYPE; ++ int n = 0; -- System.arraycopy(nodeIdBytes, 0, nodeIdBytesWithType, 1, nodeIdBytes.length); -- } ++ while (n < 17) { ++ int cnt = in.read(b, n, 17 - n); -- /** {@inheritDoc} */ -- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { -- assert nodeIdBytes.length == 16; ++ if (cnt < 0) ++ throw new IgniteCheckedException("Failed to get remote node ID (end of stream reached)"); -- if (buf.remaining() < 17) -- return false; ++ n += cnt; ++ } -- buf.put(NODE_ID_MSG_TYPE); -- buf.put(nodeIdBytes); ++ // First 4 bytes are for length. ++ UUID id = U.bytesToUuid(b, 1); -- return true; -- } ++ if (!rmtNodeId.equals(id)) ++ throw new IgniteCheckedException("Remote node ID is not as expected [expected=" + rmtNodeId + ++ ", rcvd=" + id + ']'); ++ else if (log.isDebugEnabled()) ++ log.debug("Received remote node ID: " + id); ++ } ++ catch (SocketTimeoutException e) { ++ throw new IgniteCheckedException("Failed to perform handshake due to timeout (consider increasing " + ++ "'connectionTimeout' configuration property).", e); ++ } ++ catch (IOException e) { ++ throw new IgniteCheckedException("Failed to perform handshake.", e); ++ } -- /** {@inheritDoc} */ -- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { -- if (buf.remaining() < 16) -- return false; ++ try { ++ ClusterNode localNode = getLocalNode(); -- nodeIdBytes = new byte[16]; ++ if (localNode == null) ++ throw new IgniteSpiException("Local node has not been started or fully initialized " + ++ "[isStopping=" + getSpiContext().isStopping() + ']'); -- buf.get(nodeIdBytes); ++ UUID id = localNode.id(); -- return true; -- } ++ NodeIdMessage msg = new NodeIdMessage(id); -- /** {@inheritDoc} */ -- @Override public byte directType() { -- return NODE_ID_MSG_TYPE; -- } ++ out.write(U.IGNITE_HEADER); ++ out.write(NODE_ID_MSG_TYPE); ++ out.write(msg.nodeIdBytes); -- /** {@inheritDoc} */ -- @Override public byte fieldsCount() { -- return 0; -- } ++ out.flush(); -- /** {@inheritDoc} */ -- @Override public String toString() { -- return S.toString(NodeIdMessage.class, this); ++ if (log.isDebugEnabled()) ++ log.debug("Sent local node ID [locNodeId=" + id + ", rmtNodeId=" + rmtNodeId + ']'); ++ } ++ catch (IOException e) { ++ throw new IgniteCheckedException("Failed to perform handshake.", e); ++ } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d145cb70/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationTemplateTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationTemplateTest.java index d3ba2d6,87a30a6..fa4da5a --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationTemplateTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationTemplateTest.java @@@ -42,17 -43,17 +43,13 @@@ import static org.apache.ignite.cache.C */ public class IgniteCacheConfigurationTemplateTest extends GridCommonAbstractTest { /** */ -- private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); -- -- /** */ private static final String TEMPLATE1 = "org.apache.ignite*"; -- /** */ private static final String TEMPLATE2 = "org.apache.ignite.test.*"; -- /** */ private static final String TEMPLATE3 = "org.apache.ignite.test2.*"; -- ++ /** */ ++ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); /** */ private boolean clientMode; @@@ -429,4 -460,4 +456,4 @@@ assertNull(ignite.cache(TEMPLATE3)); } } --} ++}