Merge master into ignite-843

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d145cb70
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d145cb70
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d145cb70

Branch: refs/heads/ignite-843
Commit: d145cb7093c53069c514fcf6002f65f43ad2542e
Parents: c01723f 6844370
Author: Andrey <anovi...@gridgain.com>
Authored: Tue Oct 13 09:23:10 2015 +0700
Committer: Andrey <anovi...@gridgain.com>
Committed: Tue Oct 13 09:23:19 2015 +0700

----------------------------------------------------------------------
 .../src/main/js/views/sql/chart-settings.jade   |    2 +-
 .../org/apache/ignite/IgniteTransactions.java   |    4 -
 .../processors/cache/GridCacheIoManager.java    |   93 +-
 .../processors/cache/GridCacheProcessor.java    |   23 +-
 .../dht/GridClientPartitionTopology.java        |   36 +-
 .../distributed/dht/GridDhtLocalPartition.java  |   12 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   28 +-
 .../GridDhtPartitionDemandMessage.java          |    4 +-
 .../GridDhtPartitionSupplyMessage.java          |    3 +-
 .../GridDhtPartitionsExchangeFuture.java        |   70 +-
 .../preloader/GridDhtPartitionsFullMessage.java |   12 +-
 .../GridDhtPartitionsSingleMessage.java         |   11 +-
 .../dht/preloader/GridDhtPreloader.java         |   34 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 1394 +++++++++---------
 .../IgniteCacheConfigurationTemplateTest.java   |   41 +-
 .../dht/GridCacheDhtPreloadPerformanceTest.java |  133 ++
 .../near/GridCacheNearTxForceKeyTest.java       |    2 +-
 17 files changed, 995 insertions(+), 907 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d145cb70/modules/control-center-web/src/main/js/views/sql/chart-settings.jade
----------------------------------------------------------------------
diff --cc modules/control-center-web/src/main/js/views/sql/chart-settings.jade
index 4b350a4,0000000..c1bedd1
mode 100644,000000..100644
--- a/modules/control-center-web/src/main/js/views/sql/chart-settings.jade
+++ b/modules/control-center-web/src/main/js/views/sql/chart-settings.jade
@@@ -1,38 -1,0 +1,38 @@@
 +//-
 +    Licensed to the Apache Software Foundation (ASF) under one or more
 +    contributor license agreements.  See the NOTICE file distributed with
 +    this work for additional information regarding copyright ownership.
 +    The ASF licenses this file to You under the Apache License, Version 2.0
 +    (the "License"); you may not use this file except in compliance with
 +    the License.  You may obtain a copy of the License at
 +
 +         http://www.apache.org/licenses/LICENSE-2.0
 +
 +    Unless required by applicable law or agreed to in writing, software
 +    distributed under the License is distributed on an "AS IS" BASIS,
 +    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 +    See the License for the specific language governing permissions and
 +    limitations under the License.
 +
- .popover.settings(tabindex='-1' style='width: 300px')
++.popover.settings(tabindex='-1')
 +    .arrow
 +    h3.popover-title(style='color: black') Chart settings
 +    button.close(id='chart-settings-close' ng-click='$hide()') &times;
 +    .popover-content
 +        form.form-horizontal.chart-settings(name='chartSettingsForm' 
novalidate)
 +            .form-group.chart-settings
 +                label All columns (drag columns to axis)
 +                
ul.chart-settings-columns-list(dnd-list='paragraph.chartColumns' 
dnd-allowed-types='[]')
 +                    li(ng-repeat='col in paragraph.chartColumns track by 
$index')
 +                        
.btn.btn-default.btn-chart-column-movable(dnd-draggable='col' 
dnd-effect-allowed='copy') {{col.label}}
 +                label X axis (accept only one column)
 +                
ul.chart-settings-columns-list(dnd-list='paragraph.chartKeyCols' 
dnd-drop='chartAcceptKeyColumn(paragraph, item)')
 +                    li(ng-repeat='col in paragraph.chartKeyCols track by 
$index')
 +                        .btn.btn-info.btn-chart-column {{col.label}}
 +                            
i.fa.fa-close(ng-click='chartRemoveKeyColumn(paragraph, $index)')
 +                label Y axis (accept only numeric columns)
 +                
ul.chart-settings-columns-list(dnd-list='paragraph.chartValCols' 
dnd-drop='chartAcceptValColumn(paragraph, item)')
 +                    li(ng-repeat='col in paragraph.chartValCols track by 
$index')
 +                        .btn.btn-success.btn-chart-column {{col.label}}
 +                            
i.fa.fa-close(ng-click='chartRemoveValColumn(paragraph, $index)')
 +

http://git-wip-us.apache.org/repos/asf/ignite/blob/d145cb70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index c81dae5,476a96c..85ebe94
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@@ -77,33 -75,30 +75,22 @@@ import static org.apache.ignite.interna
  public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      /** Message ID generator. */
      private static final AtomicLong idGen = new AtomicLong();
--
++    /** Mutex. */
++    private final GridSpinReadWriteLock rw = new GridSpinReadWriteLock();
      /** Delay in milliseconds between retries. */
      private long retryDelay;
--
      /** Number of retries using to send messages. */
      private int retryCnt;
--
      /** Indexed class handlers. */
      private Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new 
HashMap<>();
--
      /** Handler registry. */
      private ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, 
GridCacheMessage>>
          clsHandlers = new ConcurrentHashMap8<>();
--
      /** Ordered handler registry. */
      private ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends 
GridCacheMessage>> orderedHandlers =
          new ConcurrentHashMap8<>();
--
      /** Stopping flag. */
      private boolean stopping;
- 
-     /** Error flag. */
-     private final AtomicBoolean startErr = new AtomicBoolean();
--
--    /** Mutex. */
--    private final GridSpinReadWriteLock rw = new GridSpinReadWriteLock();
--
      /** Deployment enabled. */
      private boolean depEnabled;
  
@@@ -982,32 -981,32 +973,6 @@@
      }
  
      /**
--     * Ordered message listener.
--     */
--    private class OrderedMessageListener implements GridMessageListener {
--        /** */
--        private final IgniteBiInClosure<UUID, GridCacheMessage> c;
--
--        /**
--         * @param c Handler closure.
--         */
--        OrderedMessageListener(IgniteBiInClosure<UUID, GridCacheMessage> c) {
--            this.c = c;
--        }
--
--        /** {@inheritDoc} */
--        @SuppressWarnings({"CatchGenericClass", "unchecked"})
--        @Override public void onMessage(final UUID nodeId, Object msg) {
--            if (log.isDebugEnabled())
--                log.debug("Received cache ordered message [nodeId=" + nodeId 
+ ", msg=" + msg + ']');
--
--            final GridCacheMessage cacheMsg = (GridCacheMessage)msg;
--
--            onMessage0(nodeId, cacheMsg, c);
--        }
--    }
--
--    /**
       *
       */
      private static class ListenerKey {
@@@ -1048,4 -1047,4 +1013,30 @@@
              return res;
          }
      }
++
++    /**
++     * Ordered message listener.
++     */
++    private class OrderedMessageListener implements GridMessageListener {
++        /** */
++        private final IgniteBiInClosure<UUID, GridCacheMessage> c;
++
++        /**
++         * @param c Handler closure.
++         */
++        OrderedMessageListener(IgniteBiInClosure<UUID, GridCacheMessage> c) {
++            this.c = c;
++        }
++
++        /** {@inheritDoc} */
++        @SuppressWarnings({"CatchGenericClass", "unchecked"})
++        @Override public void onMessage(final UUID nodeId, Object msg) {
++            if (log.isDebugEnabled())
++                log.debug("Received cache ordered message [nodeId=" + nodeId 
+ ", msg=" + msg + ']');
++
++            final GridCacheMessage cacheMsg = (GridCacheMessage)msg;
++
++            onMessage0(nodeId, cacheMsg, c);
++        }
++    }
  }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d145cb70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/d145cb70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 5e3cc0b,162c116..a62ea3d
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@@ -60,40 -60,40 +60,29 @@@ public class GridClientPartitionTopolog
  
      /** Flag to control amount of output for full map. */
      private static final boolean FULL_MAP_DEBUG = false;
--
++    /** Logger. */
++    private final IgniteLogger log;
++    /** */
++    private final GridAtomicLong updateSeq = new GridAtomicLong(1);
++    /** Lock. */
++    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
      /** Cache shared context. */
      private GridCacheSharedContext cctx;
--
      /** Cache ID. */
      private int cacheId;
--
--    /** Logger. */
--    private final IgniteLogger log;
--
      /** Node to partition map. */
      private GridDhtPartitionFullMap node2part;
--
      /** Partition to node map. */
      private Map<Integer, Set<UUID>> part2node = new HashMap<>();
--
      /** */
      private GridDhtPartitionExchangeId lastExchangeId;
--
      /** */
      private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
--
      /** */
      private volatile boolean stopping;
--
      /** A future that will be completed when topology with version topVer 
will be ready to use. */
      private GridDhtTopologyFuture topReadyFut;
  
--    /** */
--    private final GridAtomicLong updateSeq = new GridAtomicLong(1);
--
--    /** Lock. */
--    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
--
      /**
       * @param cctx Context.
       * @param cacheId Cache ID.

http://git-wip-us.apache.org/repos/asf/ignite/blob/d145cb70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 2deabfe,4f124e6..06272ac
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@@ -100,21 -100,21 +100,16 @@@ public class GridDhtLocalPartition impl
      /** Create time. */
      @GridToStringExclude
      private final long createTime = U.currentTimeMillis();
--
--    /** Eviction history. */
--    private volatile Map<KeyCacheObject, GridCacheVersion> evictHist = new 
HashMap<>();
--
      /** Lock. */
      private final ReentrantLock lock = new ReentrantLock();
--
      /** Public size counter. */
      private final LongAdder8 mapPubSize = new LongAdder8();
--
      /** Remove queue. */
      private final GridCircularBuffer<T2<KeyCacheObject, GridCacheVersion>> 
rmvQueue;
--
      /** Group reservations. */
      private final CopyOnWriteArrayList<GridDhtPartitionsReservation> 
reservations = new CopyOnWriteArrayList<>();
++    /** Eviction history. */
++    private volatile Map<KeyCacheObject, GridCacheVersion> evictHist = new 
HashMap<>();
  
      /**
       * @param cctx Context.

http://git-wip-us.apache.org/repos/asf/ignite/blob/d145cb70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index a0c9c88,6bd283a..ba76eca
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@@ -77,31 -77,31 +77,23 @@@ class GridDhtPartitionTopologyImpl impl
      /** */
      private final ConcurrentMap<Integer, GridDhtLocalPartition> locParts =
          new ConcurrentHashMap8<>();
--
++    /** */
++    private final GridAtomicLong updateSeq = new GridAtomicLong(1);
++    /** Lock. */
++    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
      /** Node to partition map. */
      private GridDhtPartitionFullMap node2part;
--
      /** Partition to node map. */
      private Map<Integer, Set<UUID>> part2node = new HashMap<>();
--
      /** */
      private GridDhtPartitionExchangeId lastExchangeId;
--
      /** */
      private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
--
      /** */
      private volatile boolean stopping;
--
      /** A future that will be completed when topology with version topVer 
will be ready to use. */
      private GridDhtTopologyFuture topReadyFut;
  
--    /** */
--    private final GridAtomicLong updateSeq = new GridAtomicLong(1);
--
--    /** Lock. */
--    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
--
      /**
       * @param cctx Context.
       */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d145cb70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a1b03c1,77e47a7..7230393
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -101,83 -101,83 +101,61 @@@ public class GridDhtPartitionsExchangeF
  
      /** Dummy reassign flag. */
      private final boolean reassign;
--
--    /** Discovery event. */
--    private volatile DiscoveryEvent discoEvt;
--
      /** */
      @GridToStringInclude
      private final Collection<UUID> rcvdIds = new GridConcurrentHashSet<>();
--
--    /** Remote nodes. */
--    private volatile Collection<ClusterNode> rmtNodes;
--
--    /** Remote nodes. */
--    @GridToStringInclude
--    private volatile Collection<UUID> rmtIds;
--
      /** Oldest node. */
      @GridToStringExclude
      private final AtomicReference<ClusterNode> oldestNode = new 
AtomicReference<>();
--
      /** ExchangeFuture id. */
      private final GridDhtPartitionExchangeId exchId;
--
      /** Init flag. */
      @GridToStringInclude
      private final AtomicBoolean init = new AtomicBoolean(false);
--
      /** Ready for reply flag. */
      @GridToStringInclude
      private final AtomicBoolean ready = new AtomicBoolean(false);
--
      /** Replied flag. */
      @GridToStringInclude
      private final AtomicBoolean replied = new AtomicBoolean(false);
--
++    /** Cache context. */
++    private final GridCacheSharedContext<?, ?> cctx;
++    /**
++     * Messages received on non-coordinator are stored in case if this node
++     * becomes coordinator.
++     */
++    private final Map<UUID, GridDhtPartitionsSingleMessage> singleMsgs = new 
ConcurrentHashMap8<>();
++    /** Messages received from new coordinator. */
++    private final Map<UUID, GridDhtPartitionsFullMessage> fullMsgs = new 
ConcurrentHashMap8<>();
++    /** */
++    private final Object mux = new Object();
++    /** Discovery event. */
++    private volatile DiscoveryEvent discoEvt;
++    /** Remote nodes. */
++    private volatile Collection<ClusterNode> rmtNodes;
++    /** Remote nodes. */
++    @GridToStringInclude
++    private volatile Collection<UUID> rmtIds;
      /** Timeout object. */
      @GridToStringExclude
      private volatile GridTimeoutObject timeoutObj;
--
--    /** Cache context. */
--    private final GridCacheSharedContext<?, ?> cctx;
--
      /** Busy lock to prevent activities from accessing exchanger while it's 
stopping. */
      private ReadWriteLock busyLock;
--
      /** */
      private AtomicBoolean added = new AtomicBoolean(false);
--
      /** Event latch. */
      @GridToStringExclude
      private CountDownLatch evtLatch = new CountDownLatch(1);
--
      /** */
      private GridFutureAdapter<Boolean> initFut;
--
      /** Topology snapshot. */
      private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot = new 
AtomicReference<>();
--
      /** Last committed cache version before next topology version use. */
      private AtomicReference<GridCacheVersion> lastVer = new 
AtomicReference<>();
--
--    /**
--     * Messages received on non-coordinator are stored in case if this node
--     * becomes coordinator.
--     */
--    private final Map<UUID, GridDhtPartitionsSingleMessage> singleMsgs = new 
ConcurrentHashMap8<>();
--
--    /** Messages received from new coordinator. */
--    private final Map<UUID, GridDhtPartitionsFullMessage> fullMsgs = new 
ConcurrentHashMap8<>();
--
      /** */
      @SuppressWarnings({"FieldCanBeLocal", "UnusedDeclaration"})
      @GridToStringInclude
      private volatile IgniteInternalFuture<?> partReleaseFut;
--
--    /** */
--    private final Object mux = new Object();
--
      /** Logger. */
      private IgniteLogger log;
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/d145cb70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 19b461e,74237f8..df1e10b
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@@ -69,28 -70,28 +70,20 @@@ import static org.apache.ignite.interna
  public class GridDhtPreloader extends GridCachePreloaderAdapter {
      /** Default preload resend timeout. */
      public static final long DFLT_PRELOAD_RESEND_TIMEOUT = 1500;
--
--    /** */
--    private GridDhtPartitionTopology top;
--
      /** Topology version. */
      private final GridAtomicLong topVer = new GridAtomicLong();
--
      /** Force key futures. */
      private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> 
forceKeyFuts = newMap();
--
++    /** Busy lock to prevent activities from accessing exchanger while it's 
stopping. */
++    private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
++    /** */
++    private GridDhtPartitionTopology top;
      /** Partition suppliers. */
      private GridDhtPartitionSupplyPool supplyPool;
--
      /** Partition demanders. */
      private GridDhtPartitionDemandPool demandPool;
--
      /** Start future. */
      private GridFutureAdapter<Object> startFut;
--
--    /** Busy lock to prevent activities from accessing exchanger while it's 
stopping. */
--    private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
--
      /** Pending affinity assignment futures. */
      private ConcurrentMap<AffinityTopologyVersion, 
GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts =
          new ConcurrentHashMap8<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/d145cb70/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index c93d5af,5ea2c02..0e87158
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@@ -281,47 -281,47 +281,125 @@@ public class TcpCommunicationSpi extend
       * {@code "Math.min(4, Runtime.getRuntime().availableProcessors())"}.
       */
      public static final int DFLT_SELECTORS_CNT = Math.min(4, 
Runtime.getRuntime().availableProcessors());
--
--    /** Node ID meta for session. */
--    private static final int NODE_ID_META = 
GridNioSessionMetaKey.nextUniqueKey();
--
--    /** Message tracker meta for session. */
--    private static final int TRACKER_META = 
GridNioSessionMetaKey.nextUniqueKey();
--
      /**
       * Default local port range (value is <tt>100</tt>).
       * See {@link #setLocalPortRange(int)} for details.
       */
      public static final int DFLT_PORT_RANGE = 100;
--
      /** Default value for {@code TCP_NODELAY} socket option (value is 
<tt>true</tt>). */
      public static final boolean DFLT_TCP_NODELAY = true;
--
      /** Default received messages threshold for sending ack. */
      public static final int DFLT_ACK_SND_THRESHOLD = 16;
--
      /** Default socket write timeout. */
      public static final long DFLT_SOCK_WRITE_TIMEOUT = 2000;
--
++    /** Node ID message type. */
++    public static final byte NODE_ID_MSG_TYPE = -1;
++    /** */
++    public static final byte RECOVERY_LAST_ID_MSG_TYPE = -2;
++    /** */
++    public static final byte HANDSHAKE_MSG_TYPE = -3;
++    /** Node ID meta for session. */
++    private static final int NODE_ID_META = 
GridNioSessionMetaKey.nextUniqueKey();
++    /** Message tracker meta for session. */
++    private static final int TRACKER_META = 
GridNioSessionMetaKey.nextUniqueKey();
      /** No-op runnable. */
      private static final IgniteRunnable NOOP = new IgniteRunnable() {
          @Override public void run() {
              // No-op.
          }
      };
++    /** Shared memory workers. */
++    private final Collection<ShmemWorker> shmemWorkers = new 
ConcurrentLinkedDeque8<>();
++    /** Clients. */
++    private final ConcurrentMap<UUID, GridCommunicationClient> clients = 
GridConcurrentFactory.newMap();
++    /** Received messages count. */
++    private final LongAdder8 rcvdMsgsCnt = new LongAdder8();
++    /** Sent messages count.*/
++    private final LongAdder8 sentMsgsCnt = new LongAdder8();
++    /** Received bytes count. */
++    private final LongAdder8 rcvdBytesCnt = new LongAdder8();
++    /** Sent bytes count.*/
++    private final LongAdder8 sentBytesCnt = new LongAdder8();
++    /** Context initialization latch. */
++    private final CountDownLatch ctxInitLatch = new CountDownLatch(1);
++    /** metrics listener. */
++    private final GridNioMetricsListener metricsLsnr = new 
GridNioMetricsListener() {
++        @Override public void onBytesSent(int bytesCnt) {
++            sentBytesCnt.add(bytesCnt);
++        }
  
--    /** Node ID message type. */
--    public static final byte NODE_ID_MSG_TYPE = -1;
--
--    /** */
--    public static final byte RECOVERY_LAST_ID_MSG_TYPE = -2;
--
++        @Override public void onBytesReceived(int bytesCnt) {
++            rcvdBytesCnt.add(bytesCnt);
++        }
++    };
++    /** Client connect futures. */
++    private final ConcurrentMap<UUID, 
GridFutureAdapter<GridCommunicationClient>> clientFuts =
++        GridConcurrentFactory.newMap();
      /** */
--    public static final byte HANDSHAKE_MSG_TYPE = -3;
--
++    private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> 
recoveryDescs = GridConcurrentFactory.newMap();
      /** */
      private ConnectGateway connectGate;
++    /** Logger. */
++    @LoggerResource
++    private IgniteLogger log;
++    /** Discovery listener. */
++    private final GridLocalEventListener discoLsnr = new 
GridLocalEventListener() {
++        @Override public void onEvent(Event evt) {
++            assert evt instanceof DiscoveryEvent : evt;
++            assert evt.type() == EVT_NODE_LEFT || evt.type() == 
EVT_NODE_FAILED ;
  
++            onNodeLeft(((DiscoveryEvent)evt).eventNode().id());
++        }
++    };
++    /** Local IP address. */
++    private String locAddr;
++    /** Complex variable that represents this node IP address. */
++    private volatile InetAddress locHost;
++    /** Local port which node uses. */
++    private int locPort = DFLT_PORT;
++    /** Local port range. */
++    private int locPortRange = DFLT_PORT_RANGE;
++    /** Local port which node uses to accept shared memory connections. */
++    private int shmemPort = DFLT_SHMEM_PORT;
++    /** Allocate direct buffer or heap buffer. */
++    private boolean directBuf = true;
++    /** Allocate direct buffer or heap buffer. */
++    private boolean directSndBuf;
++    /** Idle connection timeout. */
++    private long idleConnTimeout = DFLT_IDLE_CONN_TIMEOUT;
++    /** Connect timeout. */
++    private long connTimeout = DFLT_CONN_TIMEOUT;
++    /** Maximum connect timeout. */
++    private long maxConnTimeout = DFLT_MAX_CONN_TIMEOUT;
++    /** Reconnect attempts count. */
++    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
++    private int reconCnt = DFLT_RECONNECT_CNT;
++    /** Socket send buffer. */
++    private int sockSndBuf = DFLT_SOCK_BUF_SIZE;
++    /** Socket receive buffer. */
++    private int sockRcvBuf = DFLT_SOCK_BUF_SIZE;
++    /** Message queue limit. */
++    private int msgQueueLimit = DFLT_MSG_QUEUE_LIMIT;
++    /** Slow client queue limit. */
++    private int slowClientQueueLimit;
++    /** NIO server. */
++    private GridNioServer<Message> nioSrvr;
++    /** Shared memory server. */
++    private IpcSharedMemoryServerEndpoint shmemSrv;
++    /** {@code TCP_NODELAY} option value for created sockets. */
++    private boolean tcpNoDelay = DFLT_TCP_NODELAY;
++    /** Number of received messages after which acknowledgment is sent. */
++    private int ackSndThreshold = DFLT_ACK_SND_THRESHOLD;
++    /** Maximum number of unacknowledged messages. */
++    private int unackedMsgsBufSize;
++    /** Socket write timeout. */
++    private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT;
++    /** Recovery and idle clients handler. */
++    private CommunicationWorker commWorker;
++    /** Shared memory accept worker. */
++    private ShmemAcceptWorker shmemAcceptWorker;
++    /** SPI listener. */
++    private volatile CommunicationListener<Message> lsnr;
      /** Server listener. */
      private final GridNioServerListener<Message> srvLsnr =
          new GridNioServerListenerAdapter<Message>() {
@@@ -724,145 -724,145 +802,15 @@@
                  }
              }
          };
--
--    /** Logger. */
--    @LoggerResource
--    private IgniteLogger log;
--
--    /** Local IP address. */
--    private String locAddr;
--
--    /** Complex variable that represents this node IP address. */
--    private volatile InetAddress locHost;
--
--    /** Local port which node uses. */
--    private int locPort = DFLT_PORT;
--
--    /** Local port range. */
--    private int locPortRange = DFLT_PORT_RANGE;
--
--    /** Local port which node uses to accept shared memory connections. */
--    private int shmemPort = DFLT_SHMEM_PORT;
--
--    /** Allocate direct buffer or heap buffer. */
--    private boolean directBuf = true;
--
--    /** Allocate direct buffer or heap buffer. */
--    private boolean directSndBuf;
--
--    /** Idle connection timeout. */
--    private long idleConnTimeout = DFLT_IDLE_CONN_TIMEOUT;
--
--    /** Connect timeout. */
--    private long connTimeout = DFLT_CONN_TIMEOUT;
--
--    /** Maximum connect timeout. */
--    private long maxConnTimeout = DFLT_MAX_CONN_TIMEOUT;
--
--    /** Reconnect attempts count. */
--    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
--    private int reconCnt = DFLT_RECONNECT_CNT;
--
--    /** Socket send buffer. */
--    private int sockSndBuf = DFLT_SOCK_BUF_SIZE;
--
--    /** Socket receive buffer. */
--    private int sockRcvBuf = DFLT_SOCK_BUF_SIZE;
--
--    /** Message queue limit. */
--    private int msgQueueLimit = DFLT_MSG_QUEUE_LIMIT;
--
--    /** Slow client queue limit. */
--    private int slowClientQueueLimit;
--
--    /** NIO server. */
--    private GridNioServer<Message> nioSrvr;
--
--    /** Shared memory server. */
--    private IpcSharedMemoryServerEndpoint shmemSrv;
--
--    /** {@code TCP_NODELAY} option value for created sockets. */
--    private boolean tcpNoDelay = DFLT_TCP_NODELAY;
--
--    /** Number of received messages after which acknowledgment is sent. */
--    private int ackSndThreshold = DFLT_ACK_SND_THRESHOLD;
--
--    /** Maximum number of unacknowledged messages. */
--    private int unackedMsgsBufSize;
--
--    /** Socket write timeout. */
--    private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT;
--
--    /** Recovery and idle clients handler. */
--    private CommunicationWorker commWorker;
--
--    /** Shared memory accept worker. */
--    private ShmemAcceptWorker shmemAcceptWorker;
--
--    /** Shared memory workers. */
--    private final Collection<ShmemWorker> shmemWorkers = new 
ConcurrentLinkedDeque8<>();
--
--    /** Clients. */
--    private final ConcurrentMap<UUID, GridCommunicationClient> clients = 
GridConcurrentFactory.newMap();
--
--    /** SPI listener. */
--    private volatile CommunicationListener<Message> lsnr;
--
      /** Bound port. */
      private int boundTcpPort = -1;
--
      /** Bound port for shared memory server. */
      private int boundTcpShmemPort = -1;
--
      /** Count of selectors to use in TCP server. */
      private int selectorsCnt = DFLT_SELECTORS_CNT;
--
      /** Address resolver. */
      private AddressResolver addrRslvr;
  
--    /** Received messages count. */
--    private final LongAdder8 rcvdMsgsCnt = new LongAdder8();
--
--    /** Sent messages count.*/
--    private final LongAdder8 sentMsgsCnt = new LongAdder8();
--
--    /** Received bytes count. */
--    private final LongAdder8 rcvdBytesCnt = new LongAdder8();
--
--    /** Sent bytes count.*/
--    private final LongAdder8 sentBytesCnt = new LongAdder8();
--
--    /** Context initialization latch. */
--    private final CountDownLatch ctxInitLatch = new CountDownLatch(1);
--
--    /** metrics listener. */
--    private final GridNioMetricsListener metricsLsnr = new 
GridNioMetricsListener() {
--        @Override public void onBytesSent(int bytesCnt) {
--            sentBytesCnt.add(bytesCnt);
--        }
--
--        @Override public void onBytesReceived(int bytesCnt) {
--            rcvdBytesCnt.add(bytesCnt);
--        }
--    };
--
--    /** Client connect futures. */
--    private final ConcurrentMap<UUID, 
GridFutureAdapter<GridCommunicationClient>> clientFuts =
--        GridConcurrentFactory.newMap();
--
--    /** */
--    private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> 
recoveryDescs = GridConcurrentFactory.newMap();
--
--    /** Discovery listener. */
--    private final GridLocalEventListener discoLsnr = new 
GridLocalEventListener() {
--        @Override public void onEvent(Event evt) {
--            assert evt instanceof DiscoveryEvent : evt;
--            assert evt.type() == EVT_NODE_LEFT || evt.type() == 
EVT_NODE_FAILED ;
--
--            onNodeLeft(((DiscoveryEvent)evt).eventNode().id());
--        }
--    };
--
      /**
       * @return {@code True} if ssl enabled.
       */
@@@ -897,6 -897,6 +845,11 @@@
          }
      }
  
++    /** {@inheritDoc} */
++    @Override public String getLocalAddress() {
++        return locAddr;
++    }
++
      /**
       * Sets local host address for socket binding. Note that one node could 
have
       * additional addresses beside the loopback one. This configuration
@@@ -913,8 -913,8 +866,8 @@@
      }
  
      /** {@inheritDoc} */
--    @Override public String getLocalAddress() {
--        return locAddr;
++    @Override public int getLocalPort() {
++        return locPort;
      }
  
      /**
@@@ -930,8 -930,8 +883,8 @@@
      }
  
      /** {@inheritDoc} */
--    @Override public int getLocalPort() {
--        return locPort;
++    @Override public int getLocalPortRange() {
++        return locPortRange;
      }
  
      /**
@@@ -956,8 -956,8 +909,8 @@@
      }
  
      /** {@inheritDoc} */
--    @Override public int getLocalPortRange() {
--        return locPortRange;
++    @Override public int getSharedMemoryPort() {
++        return shmemPort;
      }
  
      /**
@@@ -975,8 -975,8 +928,8 @@@
      }
  
      /** {@inheritDoc} */
--    @Override public int getSharedMemoryPort() {
--        return shmemPort;
++    @Override public long getIdleConnectionTimeout() {
++        return idleConnTimeout;
      }
  
      /**
@@@ -993,11 -993,11 +946,6 @@@
      }
  
      /** {@inheritDoc} */
--    @Override public long getIdleConnectionTimeout() {
--        return idleConnTimeout;
--    }
--
--    /** {@inheritDoc} */
      @Override public long getSocketWriteTimeout() {
          return sockWriteTimeout;
      }
@@@ -1049,6 -1049,6 +997,12 @@@
          this.unackedMsgsBufSize = unackedMsgsBufSize;
      }
  
++    /** {@inheritDoc} */
++    @Deprecated
++    @Override public int getConnectionBufferSize() {
++        return 0;
++    }
++
      /**
       * Sets connection buffer size. If set to {@code 0} connection buffer is 
disabled.
       *
@@@ -1064,7 -1064,7 +1018,7 @@@
  
      /** {@inheritDoc} */
      @Deprecated
--    @Override public int getConnectionBufferSize() {
++    @Override public long getConnectionBufferFlushFrequency() {
          return 0;
      }
  
@@@ -1076,9 -1076,9 +1030,8 @@@
      }
  
      /** {@inheritDoc} */
--    @Deprecated
--    @Override public long getConnectionBufferFlushFrequency() {
--        return 0;
++    @Override public long getConnectTimeout() {
++        return connTimeout;
      }
  
      /**
@@@ -1101,8 -1101,8 +1054,8 @@@
      }
  
      /** {@inheritDoc} */
--    @Override public long getConnectTimeout() {
--        return connTimeout;
++    @Override public long getMaxConnectTimeout() {
++        return maxConnTimeout;
      }
  
      /**
@@@ -1127,8 -1127,8 +1080,8 @@@
      }
  
      /** {@inheritDoc} */
--    @Override public long getMaxConnectTimeout() {
--        return maxConnTimeout;
++    @Override public int getReconnectCount() {
++        return reconCnt;
      }
  
      /**
@@@ -1149,8 -1149,8 +1102,8 @@@
      }
  
      /** {@inheritDoc} */
--    @Override public int getReconnectCount() {
--        return reconCnt;
++    @Override public boolean isDirectBuffer() {
++        return directBuf;
      }
  
      /**
@@@ -1168,11 -1168,11 +1121,6 @@@
      }
  
      /** {@inheritDoc} */
--    @Override public boolean isDirectBuffer() {
--        return directBuf;
--    }
--
--    /** {@inheritDoc} */
      @Override public boolean isDirectSendBuffer() {
          return directSndBuf;
      }
@@@ -1189,6 -1189,6 +1137,11 @@@
          this.directSndBuf = directSndBuf;
      }
  
++    /** {@inheritDoc} */
++    @Override public int getSelectorsCount() {
++        return selectorsCnt;
++    }
++
      /**
       * Sets the count of selectors te be used in TCP server.
       * <p/>
@@@ -1202,8 -1202,8 +1155,8 @@@
      }
  
      /** {@inheritDoc} */
--    @Override public int getSelectorsCount() {
--        return selectorsCnt;
++    @Override public boolean isTcpNoDelay() {
++        return tcpNoDelay;
      }
  
      /**
@@@ -1226,8 -1226,8 +1179,8 @@@
      }
  
      /** {@inheritDoc} */
--    @Override public boolean isTcpNoDelay() {
--        return tcpNoDelay;
++    @Override public int getSocketReceiveBuffer() {
++        return sockRcvBuf;
      }
  
      /**
@@@ -1243,8 -1243,8 +1196,8 @@@
      }
  
      /** {@inheritDoc} */
--    @Override public int getSocketReceiveBuffer() {
--        return sockRcvBuf;
++    @Override public int getSocketSendBuffer() {
++        return sockSndBuf;
      }
  
      /**
@@@ -1260,8 -1260,8 +1213,8 @@@
      }
  
      /** {@inheritDoc} */
--    @Override public int getSocketSendBuffer() {
--        return sockSndBuf;
++    @Override public int getMessageQueueLimit() {
++        return msgQueueLimit;
      }
  
      /**
@@@ -1280,11 -1280,11 +1233,6 @@@
      }
  
      /** {@inheritDoc} */
--    @Override public int getMessageQueueLimit() {
--        return msgQueueLimit;
--    }
--
--    /** {@inheritDoc} */
      @Override public int getSlowClientQueueLimit() {
          return slowClientQueueLimit;
      }
@@@ -1305,6 -1305,6 +1253,12 @@@
          this.slowClientQueueLimit = slowClientQueueLimit;
      }
  
++    /** {@inheritDoc} */
++    @Deprecated
++    @Override public int getMinimumBufferedMessageCount() {
++        return 0;
++    }
++
      /**
       * Sets the minimum number of messages for this SPI, that are buffered
       * prior to sending.
@@@ -1318,17 -1318,17 +1272,6 @@@
          // No-op.
      }
  
--    /** {@inheritDoc} */
--    @Deprecated
--    @Override public int getMinimumBufferedMessageCount() {
--        return 0;
--    }
--
--    /** {@inheritDoc} */
--    @Override public void setListener(CommunicationListener<Message> lsnr) {
--        this.lsnr = lsnr;
--    }
--
      /**
       * @return Listener.
       */
@@@ -1337,6 -1337,6 +1280,11 @@@
      }
  
      /** {@inheritDoc} */
++    @Override public void setListener(CommunicationListener<Message> lsnr) {
++        this.lsnr = lsnr;
++    }
++
++    /** {@inheritDoc} */
      @Override public int getSentMessagesCount() {
          return sentMsgsCnt.intValue();
      }
@@@ -2832,732 -2844,732 +2792,732 @@@
      }
  
      /**
--     * This worker takes responsibility to shut the server down when stopping,
--     * No other thread shall stop passed server.
++     *
       */
--    private class ShmemAcceptWorker extends GridWorker {
++    private static class ConnectFuture extends 
GridFutureAdapter<GridCommunicationClient> {
          /** */
--        private final IpcSharedMemoryServerEndpoint srv;
++        private static final long serialVersionUID = 0L;
++
++        // No-op.
++    }
++
++    /**
++     *
++     */
++    private static class HandshakeTimeoutObject<T> implements 
IgniteSpiTimeoutObject {
++        /** */
++        private final IgniteUuid id = IgniteUuid.randomUuid();
++
++        /** */
++        private final T obj;
++
++        /** */
++        private final long endTime;
++
++        /** */
++        private final AtomicBoolean done = new AtomicBoolean();
  
          /**
--         * @param srv Server.
++         * @param obj Client.
++         * @param endTime End time.
           */
--        ShmemAcceptWorker(IpcSharedMemoryServerEndpoint srv) {
--            super(gridName, "shmem-communication-acceptor", 
TcpCommunicationSpi.this.log);
++        private HandshakeTimeoutObject(T obj, long endTime) {
++            assert obj != null;
++            assert obj instanceof GridCommunicationClient || obj instanceof 
SelectableChannel;
++            assert endTime > 0;
  
--            this.srv = srv;
++            this.obj = obj;
++            this.endTime = endTime;
          }
  
--        /** {@inheritDoc} */
--        @Override protected void body() throws InterruptedException {
--            try {
--                while (!Thread.interrupted()) {
--                    ShmemWorker e = new ShmemWorker(srv.accept());
--
--                    shmemWorkers.add(e);
++        /**
++         * @return {@code True} if object has not yet been timed out.
++         */
++        boolean cancel() {
++            return done.compareAndSet(false, true);
++        }
  
--                    new IgniteThread(e).start();
--                }
--            }
--            catch (IgniteCheckedException e) {
--                if (!isCancelled())
--                    U.error(log, "Shmem server failed.", e);
--            }
--            finally {
--                srv.close();
++        /** {@inheritDoc} */
++        @Override public void onTimeout() {
++            if (done.compareAndSet(false, true)) {
++                // Close socket - timeout occurred.
++                if (obj instanceof GridCommunicationClient)
++                    ((GridCommunicationClient)obj).forceClose();
++                else
++                    U.closeQuiet((AbstractInterruptibleChannel)obj);
              }
          }
  
          /** {@inheritDoc} */
--        @Override public void cancel() {
--            super.cancel();
++        @Override public long endTime() {
++            return endTime;
++        }
  
--            srv.close();
++        /** {@inheritDoc} */
++        @Override public IgniteUuid id() {
++            return id;
++        }
++
++        /** {@inheritDoc} */
++        @Override public String toString() {
++            return S.toString(HandshakeTimeoutObject.class, this);
          }
      }
  
      /**
--     *
++     * Handshake message.
       */
--    private class ShmemWorker extends GridWorker {
++    @SuppressWarnings("PublicInnerClass")
++    public static class HandshakeMessage implements Message {
          /** */
--        private final IpcEndpoint endpoint;
++        private static final long serialVersionUID = 0L;
++
++        /** */
++        private UUID nodeId;
++
++        /** */
++        private long rcvCnt;
++
++        /** */
++        private long connectCnt;
  
          /**
--         * @param endpoint Endpoint.
++         * Default constructor required by {@link Message}.
           */
--        private ShmemWorker(IpcEndpoint endpoint) {
--            super(gridName, "shmem-worker", TcpCommunicationSpi.this.log);
--
--            this.endpoint = endpoint;
++        public HandshakeMessage() {
++            // No-op.
          }
  
--        /** {@inheritDoc} */
--        @Override protected void body() throws InterruptedException {
--            try {
--                MessageFactory msgFactory = new MessageFactory() {
--                    private MessageFactory impl;
++        /**
++         * @param nodeId Node ID.
++         * @param connectCnt Connect count.
++         * @param rcvCnt Number of received messages.
++         */
++        public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) {
++            assert nodeId != null;
++            assert rcvCnt >= 0 : rcvCnt;
  
--                    @Nullable @Override public Message create(byte type) {
--                        if (impl == null)
--                            impl = getSpiContext().messageFactory();
++            this.nodeId = nodeId;
++            this.connectCnt = connectCnt;
++            this.rcvCnt = rcvCnt;
++        }
  
--                        assert impl != null;
++        /**
++         * @return Connect count.
++         */
++        public long connectCount() {
++            return connectCnt;
++        }
  
--                        return impl.create(type);
--                    }
--                };
++        /**
++         * @return Number of received messages.
++         */
++        public long received() {
++            return rcvCnt;
++        }
  
--                MessageFormatter msgFormatter = new MessageFormatter() {
--                    private MessageFormatter impl;
++        /**
++         * @return Node ID.
++         */
++        public UUID nodeId() {
++            return nodeId;
++        }
  
--                    @Override public MessageWriter writer() {
--                        if (impl == null)
--                            impl = getSpiContext().messageFormatter();
- 
-                         assert impl != null;
++        /** {@inheritDoc} */
++        @Override public boolean writeTo(ByteBuffer buf, MessageWriter 
writer) {
++            if (buf.remaining() < 33)
++                return false;
  
-                         return impl.writer();
-                     }
 -                        assert impl != null;
++            buf.put(HANDSHAKE_MSG_TYPE);
  
-                     @Override public MessageReader reader(MessageFactory 
factory, Class<? extends Message> msgCls) {
-                         if (impl == null)
-                             impl = getSpiContext().messageFormatter();
 -                        return impl.writer();
 -                    }
++            byte[] bytes = U.uuidToBytes(nodeId);
  
 -                    @Override public MessageReader reader(MessageFactory 
factory, Class<? extends Message> msgCls) {
 -                        if (impl == null)
 -                            impl = getSpiContext().messageFormatter();
 -
--                        assert impl != null;
++            assert bytes.length == 16 : bytes.length;
  
--                        return impl.reader(factory, msgCls);
--                    }
--                };
++            buf.put(bytes);
  
--                IpcToNioAdapter<Message> adapter = new IpcToNioAdapter<>(
--                    metricsLsnr,
--                    log,
--                    endpoint,
--                    srvLsnr,
--                    msgFormatter,
--                    new GridNioCodecFilter(new GridDirectParser(msgFactory, 
msgFormatter), log, true),
--                    new GridConnectionBytesVerifyFilter(log)
--                );
++            buf.putLong(rcvCnt);
  
--                adapter.serve();
--            }
--            finally {
--                shmemWorkers.remove(this);
++            buf.putLong(connectCnt);
  
--                endpoint.close();
--            }
++            return true;
          }
  
          /** {@inheritDoc} */
--        @Override public void cancel() {
--            super.cancel();
++        @Override public boolean readFrom(ByteBuffer buf, MessageReader 
reader) {
++            if (buf.remaining() < 32)
++                return false;
  
--            endpoint.close();
++            byte[] nodeIdBytes = new byte[16];
++
++            buf.get(nodeIdBytes);
++
++            nodeId = U.bytesToUuid(nodeIdBytes, 0);
++
++            rcvCnt = buf.getLong();
++
++            connectCnt = buf.getLong();
++
++            return true;
          }
  
--        /** @{@inheritDoc} */
--        @Override protected void cleanup() {
--            super.cleanup();
++        /** {@inheritDoc} */
++        @Override public byte directType() {
++            return HANDSHAKE_MSG_TYPE;
++        }
  
--            endpoint.close();
++        /** {@inheritDoc} */
++        @Override public byte fieldsCount() {
++            throw new UnsupportedOperationException();
          }
  
--        /** @{@inheritDoc} */
++        /** {@inheritDoc} */
          @Override public String toString() {
--            return S.toString(ShmemWorker.class, this);
++            return S.toString(HandshakeMessage.class, this);
          }
      }
  
      /**
--     *
++     * Recovery acknowledgment message.
       */
--    private class CommunicationWorker extends IgniteSpiThread {
++    @SuppressWarnings("PublicInnerClass")
++    public static class RecoveryLastReceivedMessage implements Message {
          /** */
--        private final BlockingQueue<GridNioRecoveryDescriptor> q = new 
LinkedBlockingQueue<>();
++        private static final long serialVersionUID = 0L;
++
++        /** */
++        private long rcvCnt;
  
          /**
--         *
++         * Default constructor required by {@link Message}.
           */
--        private CommunicationWorker() {
--            super(gridName, "tcp-comm-worker", log);
++        public RecoveryLastReceivedMessage() {
++            // No-op.
          }
  
--        /** {@inheritDoc} */
--        @Override protected void body() throws InterruptedException {
--            if (log.isDebugEnabled())
--                log.debug("Tcp communication worker has been started.");
--
--            while (!isInterrupted()) {
--                GridNioRecoveryDescriptor recoveryDesc = 
q.poll(idleConnTimeout, TimeUnit.MILLISECONDS);
--
--                if (recoveryDesc != null)
--                    processRecovery(recoveryDesc);
--                else
--                    processIdle();
--            }
++        /**
++         * @param rcvCnt Number of received messages.
++         */
++        public RecoveryLastReceivedMessage(long rcvCnt) {
++            this.rcvCnt = rcvCnt;
          }
  
          /**
--         *
++         * @return Number of received messages.
           */
--        private void processIdle() {
--            cleanupRecovery();
++        public long received() {
++            return rcvCnt;
++        }
  
--            for (Map.Entry<UUID, GridCommunicationClient> e : 
clients.entrySet()) {
--                UUID nodeId = e.getKey();
++        /** {@inheritDoc} */
++        @Override public boolean writeTo(ByteBuffer buf, MessageWriter 
writer) {
++            if (buf.remaining() < 9)
++                return false;
  
--                GridCommunicationClient client = e.getValue();
++            buf.put(RECOVERY_LAST_ID_MSG_TYPE);
  
--                ClusterNode node = getSpiContext().node(nodeId);
++            buf.putLong(rcvCnt);
  
--                if (node == null) {
--                    if (log.isDebugEnabled())
--                        log.debug("Forcing close of non-existent node 
connection: " + nodeId);
++            return true;
++        }
  
--                    client.forceClose();
++        /** {@inheritDoc} */
++        @Override public boolean readFrom(ByteBuffer buf, MessageReader 
reader) {
++            if (buf.remaining() < 8)
++                return false;
  
--                    clients.remove(nodeId, client);
++            rcvCnt = buf.getLong();
  
--                    continue;
--                }
++            return true;
++        }
  
--                GridNioRecoveryDescriptor recovery = null;
++        /** {@inheritDoc} */
++        @Override public byte directType() {
++            return RECOVERY_LAST_ID_MSG_TYPE;
++        }
  
--                if (client instanceof GridTcpNioCommunicationClient) {
--                    recovery = recoveryDescs.get(new ClientKey(node.id(), 
node.order()));
++        /** {@inheritDoc} */
++        @Override public byte fieldsCount() {
++            return 0;
++        }
  
--                    if (recovery != null && recovery.lastAcknowledged() != 
recovery.received()) {
--                        RecoveryLastReceivedMessage msg = new 
RecoveryLastReceivedMessage(recovery.received());
++        /** {@inheritDoc} */
++        @Override public String toString() {
++            return S.toString(RecoveryLastReceivedMessage.class, this);
++        }
++    }
  
--                        if (log.isDebugEnabled())
--                            log.debug("Send recovery acknowledgement on 
timeout [rmtNode=" + nodeId +
--                                ", rcvCnt=" + msg.received() + ']');
++    /**
++     * Node ID message.
++     */
++    @SuppressWarnings("PublicInnerClass")
++    public static class NodeIdMessage implements Message {
++        /** */
++        private static final long serialVersionUID = 0L;
  
--                        
nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
++        /** */
++        private byte[] nodeIdBytes;
  
--                        recovery.lastAcknowledged(msg.received());
++        /** */
++        private byte[] nodeIdBytesWithType;
  
--                        continue;
--                    }
--                }
++        /** */
++        public NodeIdMessage() {
++            // No-op.
++        }
  
--                long idleTime = client.getIdleTime();
++        /**
++         * @param nodeId Node ID.
++         */
++        private NodeIdMessage(UUID nodeId) {
++            assert nodeId != null;
  
--                if (idleTime >= idleConnTimeout) {
--                    if (recovery != null &&
--                        recovery.nodeAlive(getSpiContext().node(nodeId)) &&
--                        !recovery.messagesFutures().isEmpty()) {
--                        if (log.isDebugEnabled())
--                            log.debug("Node connection is idle, but there are 
unacknowledged messages, " +
--                                "will wait: " + nodeId);
++            nodeIdBytes = U.uuidToBytes(nodeId);
  
--                        continue;
--                    }
++            nodeIdBytesWithType = new byte[nodeIdBytes.length + 1];
  
--                    if (log.isDebugEnabled())
--                        log.debug("Closing idle node connection: " + nodeId);
++            nodeIdBytesWithType[0] = NODE_ID_MSG_TYPE;
  
--                    if (client.close() || client.closed())
--                        clients.remove(nodeId, client);
--                }
--            }
++            System.arraycopy(nodeIdBytes, 0, nodeIdBytesWithType, 1, 
nodeIdBytes.length);
          }
  
--        /**
--         *
--         */
--        private void cleanupRecovery() {
--            Set<ClientKey> left = null;
++        /** {@inheritDoc} */
++        @Override public boolean writeTo(ByteBuffer buf, MessageWriter 
writer) {
++            assert nodeIdBytes.length == 16;
  
--            for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> e : 
recoveryDescs.entrySet()) {
--                if (left != null && left.contains(e.getKey()))
--                    continue;
++            if (buf.remaining() < 17)
++                return false;
  
--                GridNioRecoveryDescriptor recoverySnd = e.getValue();
++            buf.put(NODE_ID_MSG_TYPE);
++            buf.put(nodeIdBytes);
  
--                if 
(!recoverySnd.nodeAlive(getSpiContext().node(recoverySnd.node().id()))) {
--                    if (left == null)
--                        left = new HashSet<>();
++            return true;
++        }
  
--                    left.add(e.getKey());
--                }
--            }
++        /** {@inheritDoc} */
++        @Override public boolean readFrom(ByteBuffer buf, MessageReader 
reader) {
++            if (buf.remaining() < 16)
++                return false;
  
--            if (left != null) {
--                assert !left.isEmpty();
++            nodeIdBytes = new byte[16];
  
--                for (ClientKey id : left) {
--                    GridNioRecoveryDescriptor recoverySnd = 
recoveryDescs.remove(id);
++            buf.get(nodeIdBytes);
  
--                    if (recoverySnd != null)
--                        recoverySnd.onNodeLeft();
--                }
--            }
++            return true;
          }
  
--        /**
--         * @param recoveryDesc Recovery descriptor.
--         */
--        private void processRecovery(GridNioRecoveryDescriptor recoveryDesc) {
--            ClusterNode node = recoveryDesc.node();
++        /** {@inheritDoc} */
++        @Override public byte directType() {
++            return NODE_ID_MSG_TYPE;
++        }
  
--            try {
--                if (clients.containsKey(node.id()) ||
--                    !recoveryDesc.nodeAlive(getSpiContext().node(node.id())) 
||
--                    !getSpiContext().pingNode(node.id()))
--                    return;
--            }
--            catch (IgniteClientDisconnectedException e) {
--                if (log.isDebugEnabled())
--                    log.debug("Failed to ping node, client disconnected.");
++        /** {@inheritDoc} */
++        @Override public byte fieldsCount() {
++            return 0;
++        }
  
--                return;
--            }
++        /** {@inheritDoc} */
++        @Override public String toString() {
++            return S.toString(NodeIdMessage.class, this);
++        }
++    }
  
--            try {
--                if (log.isDebugEnabled())
--                    log.debug("Recovery reconnect [rmtNode=" + 
recoveryDesc.node().id() + ']');
++    /**
++     * This worker takes responsibility to shut the server down when stopping,
++     * No other thread shall stop passed server.
++     */
++    private class ShmemAcceptWorker extends GridWorker {
++        /** */
++        private final IpcSharedMemoryServerEndpoint srv;
  
--                GridCommunicationClient client = reserveClient(node);
++        /**
++         * @param srv Server.
++         */
++        ShmemAcceptWorker(IpcSharedMemoryServerEndpoint srv) {
++            super(gridName, "shmem-communication-acceptor", 
TcpCommunicationSpi.this.log);
  
--                client.release();
--            }
--            catch (IgniteCheckedException | IgniteException e) {
--                if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) {
--                    if (log.isDebugEnabled())
--                        log.debug("Recovery reconnect failed, will retry " +
--                            "[rmtNode=" + recoveryDesc.node().id() + ", err=" 
+ e + ']');
++            this.srv = srv;
++        }
  
--                    addReconnectRequest(recoveryDesc);
--                }
--                else {
--                    if (log.isDebugEnabled())
--                        log.debug("Recovery reconnect failed, " +
--                            "node left [rmtNode=" + recoveryDesc.node().id() 
+ ", err=" + e + ']');
++        /** {@inheritDoc} */
++        @Override protected void body() throws InterruptedException {
++            try {
++                while (!Thread.interrupted()) {
++                    ShmemWorker e = new ShmemWorker(srv.accept());
  
--                    onException("Recovery reconnect failed, node left 
[rmtNode=" + recoveryDesc.node().id() + "]",
--                        e);
++                    shmemWorkers.add(e);
++
++                    new IgniteThread(e).start();
                  }
              }
++            catch (IgniteCheckedException e) {
++                if (!isCancelled())
++                    U.error(log, "Shmem server failed.", e);
++            }
++            finally {
++                srv.close();
++            }
          }
  
--        /**
--         * @param recoverySnd Recovery send data.
--         */
--        void addReconnectRequest(GridNioRecoveryDescriptor recoverySnd) {
--            boolean add = q.add(recoverySnd);
++        /** {@inheritDoc} */
++        @Override public void cancel() {
++            super.cancel();
  
--            assert add;
++            srv.close();
          }
      }
  
      /**
       *
       */
--    private static class ConnectFuture extends 
GridFutureAdapter<GridCommunicationClient> {
++    private class ShmemWorker extends GridWorker {
          /** */
--        private static final long serialVersionUID = 0L;
++        private final IpcEndpoint endpoint;
  
--        // No-op.
--    }
++        /**
++         * @param endpoint Endpoint.
++         */
++        private ShmemWorker(IpcEndpoint endpoint) {
++            super(gridName, "shmem-worker", TcpCommunicationSpi.this.log);
  
--    /**
--     *
--     */
--    private static class HandshakeTimeoutObject<T> implements 
IgniteSpiTimeoutObject {
--        /** */
--        private final IgniteUuid id = IgniteUuid.randomUuid();
++            this.endpoint = endpoint;
++        }
  
--        /** */
--        private final T obj;
++        /** {@inheritDoc} */
++        @Override protected void body() throws InterruptedException {
++            try {
++                MessageFactory msgFactory = new MessageFactory() {
++                    private MessageFactory impl;
  
--        /** */
--        private final long endTime;
++                    @Nullable @Override public Message create(byte type) {
++                        if (impl == null)
++                            impl = getSpiContext().messageFactory();
  
--        /** */
--        private final AtomicBoolean done = new AtomicBoolean();
++                        assert impl != null;
  
--        /**
--         * @param obj Client.
--         * @param endTime End time.
--         */
--        private HandshakeTimeoutObject(T obj, long endTime) {
--            assert obj != null;
--            assert obj instanceof GridCommunicationClient || obj instanceof 
SelectableChannel;
--            assert endTime > 0;
++                        return impl.create(type);
++                    }
++                };
  
--            this.obj = obj;
--            this.endTime = endTime;
--        }
++                MessageFormatter msgFormatter = new MessageFormatter() {
++                    private MessageFormatter impl;
  
--        /**
--         * @return {@code True} if object has not yet been timed out.
--         */
--        boolean cancel() {
--            return done.compareAndSet(false, true);
--        }
++                    @Override public MessageWriter writer() {
++                        if (impl == null)
++                            impl = getSpiContext().messageFormatter();
  
--        /** {@inheritDoc} */
--        @Override public void onTimeout() {
--            if (done.compareAndSet(false, true)) {
--                // Close socket - timeout occurred.
--                if (obj instanceof GridCommunicationClient)
--                    ((GridCommunicationClient)obj).forceClose();
--                else
--                    U.closeQuiet((AbstractInterruptibleChannel)obj);
++                        assert impl != null;
++
++                        return impl.writer();
++                    }
++
++                    @Override public MessageReader reader(MessageFactory 
factory, Class<? extends Message> msgCls) {
++                        if (impl == null)
++                            impl = getSpiContext().messageFormatter();
++
++                        assert impl != null;
++
++                        return impl.reader(factory, msgCls);
++                    }
++                };
++
++                IpcToNioAdapter<Message> adapter = new IpcToNioAdapter<>(
++                    metricsLsnr,
++                    log,
++                    endpoint,
++                    srvLsnr,
++                    msgFormatter,
++                    new GridNioCodecFilter(new GridDirectParser(msgFactory, 
msgFormatter), log, true),
++                    new GridConnectionBytesVerifyFilter(log)
++                );
++
++                adapter.serve();
++            }
++            finally {
++                shmemWorkers.remove(this);
++
++                endpoint.close();
              }
          }
  
          /** {@inheritDoc} */
--        @Override public long endTime() {
--            return endTime;
++        @Override public void cancel() {
++            super.cancel();
++
++            endpoint.close();
          }
  
--        /** {@inheritDoc} */
--        @Override public IgniteUuid id() {
--            return id;
++        /** @{@inheritDoc} */
++        @Override protected void cleanup() {
++            super.cleanup();
++
++            endpoint.close();
          }
  
--        /** {@inheritDoc} */
++        /** @{@inheritDoc} */
          @Override public String toString() {
--            return S.toString(HandshakeTimeoutObject.class, this);
++            return S.toString(ShmemWorker.class, this);
          }
      }
  
      /**
       *
       */
--    private class HandshakeClosure extends IgniteInClosure2X<InputStream, 
OutputStream> {
--        /** */
--        private static final long serialVersionUID = 0L;
--
++    private class CommunicationWorker extends IgniteSpiThread {
          /** */
--        private final UUID rmtNodeId;
++        private final BlockingQueue<GridNioRecoveryDescriptor> q = new 
LinkedBlockingQueue<>();
  
          /**
--         * @param rmtNodeId Remote node ID.
++         *
           */
--        private HandshakeClosure(UUID rmtNodeId) {
--            this.rmtNodeId = rmtNodeId;
++        private CommunicationWorker() {
++            super(gridName, "tcp-comm-worker", log);
          }
  
          /** {@inheritDoc} */
--        @SuppressWarnings("ThrowFromFinallyBlock")
--        @Override public void applyx(InputStream in, OutputStream out) throws 
IgniteCheckedException {
--            try {
--                // Handshake.
--                byte[] b = new byte[17];
++        @Override protected void body() throws InterruptedException {
++            if (log.isDebugEnabled())
++                log.debug("Tcp communication worker has been started.");
  
--                int n = 0;
++            while (!isInterrupted()) {
++                GridNioRecoveryDescriptor recoveryDesc = 
q.poll(idleConnTimeout, TimeUnit.MILLISECONDS);
  
--                while (n < 17) {
--                    int cnt = in.read(b, n, 17 - n);
++                if (recoveryDesc != null)
++                    processRecovery(recoveryDesc);
++                else
++                    processIdle();
++            }
++        }
  
--                    if (cnt < 0)
--                        throw new IgniteCheckedException("Failed to get 
remote node ID (end of stream reached)");
++        /**
++         *
++         */
++        private void processIdle() {
++            cleanupRecovery();
  
--                    n += cnt;
--                }
++            for (Map.Entry<UUID, GridCommunicationClient> e : 
clients.entrySet()) {
++                UUID nodeId = e.getKey();
  
--                // First 4 bytes are for length.
--                UUID id = U.bytesToUuid(b, 1);
++                GridCommunicationClient client = e.getValue();
  
--                if (!rmtNodeId.equals(id))
--                    throw new IgniteCheckedException("Remote node ID is not 
as expected [expected=" + rmtNodeId +
--                        ", rcvd=" + id + ']');
--                else if (log.isDebugEnabled())
--                    log.debug("Received remote node ID: " + id);
--            }
--            catch (SocketTimeoutException e) {
--                throw new IgniteCheckedException("Failed to perform handshake 
due to timeout (consider increasing " +
--                    "'connectionTimeout' configuration property).", e);
--            }
--            catch (IOException e) {
--                throw new IgniteCheckedException("Failed to perform 
handshake.", e);
--            }
++                ClusterNode node = getSpiContext().node(nodeId);
  
--            try {
--                ClusterNode localNode = getLocalNode();
++                if (node == null) {
++                    if (log.isDebugEnabled())
++                        log.debug("Forcing close of non-existent node 
connection: " + nodeId);
  
--                if (localNode == null)
--                    throw new IgniteSpiException("Local node has not been 
started or fully initialized " +
--                        "[isStopping=" + getSpiContext().isStopping() + ']');
++                    client.forceClose();
  
--                UUID id = localNode.id();
++                    clients.remove(nodeId, client);
  
--                NodeIdMessage msg = new NodeIdMessage(id);
++                    continue;
++                }
  
--                out.write(U.IGNITE_HEADER);
--                out.write(NODE_ID_MSG_TYPE);
--                out.write(msg.nodeIdBytes);
++                GridNioRecoveryDescriptor recovery = null;
  
--                out.flush();
++                if (client instanceof GridTcpNioCommunicationClient) {
++                    recovery = recoveryDescs.get(new ClientKey(node.id(), 
node.order()));
  
--                if (log.isDebugEnabled())
--                    log.debug("Sent local node ID [locNodeId=" + id + ", 
rmtNodeId=" + rmtNodeId + ']');
--            }
--            catch (IOException e) {
--                throw new IgniteCheckedException("Failed to perform 
handshake.", e);
--            }
--        }
--    }
++                    if (recovery != null && recovery.lastAcknowledged() != 
recovery.received()) {
++                        RecoveryLastReceivedMessage msg = new 
RecoveryLastReceivedMessage(recovery.received());
  
--    /**
--     * Handshake message.
--     */
--    @SuppressWarnings("PublicInnerClass")
--    public static class HandshakeMessage implements Message {
--        /** */
--        private static final long serialVersionUID = 0L;
++                        if (log.isDebugEnabled())
++                            log.debug("Send recovery acknowledgement on 
timeout [rmtNode=" + nodeId +
++                                ", rcvCnt=" + msg.received() + ']');
  
--        /** */
--        private UUID nodeId;
++                        
nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
  
--        /** */
--        private long rcvCnt;
++                        recovery.lastAcknowledged(msg.received());
  
--        /** */
--        private long connectCnt;
++                        continue;
++                    }
++                }
  
--        /**
--         * Default constructor required by {@link Message}.
--         */
--        public HandshakeMessage() {
--            // No-op.
--        }
++                long idleTime = client.getIdleTime();
  
--        /**
--         * @param nodeId Node ID.
--         * @param connectCnt Connect count.
--         * @param rcvCnt Number of received messages.
--         */
--        public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) {
--            assert nodeId != null;
--            assert rcvCnt >= 0 : rcvCnt;
++                if (idleTime >= idleConnTimeout) {
++                    if (recovery != null &&
++                        recovery.nodeAlive(getSpiContext().node(nodeId)) &&
++                        !recovery.messagesFutures().isEmpty()) {
++                        if (log.isDebugEnabled())
++                            log.debug("Node connection is idle, but there are 
unacknowledged messages, " +
++                                "will wait: " + nodeId);
  
--            this.nodeId = nodeId;
--            this.connectCnt = connectCnt;
--            this.rcvCnt = rcvCnt;
--        }
++                        continue;
++                    }
  
--        /**
--         * @return Connect count.
--         */
--        public long connectCount() {
--            return connectCnt;
--        }
++                    if (log.isDebugEnabled())
++                        log.debug("Closing idle node connection: " + nodeId);
  
--        /**
--         * @return Number of received messages.
--         */
--        public long received() {
--            return rcvCnt;
++                    if (client.close() || client.closed())
++                        clients.remove(nodeId, client);
++                }
++            }
          }
  
          /**
--         * @return Node ID.
++         *
           */
--        public UUID nodeId() {
--            return nodeId;
--        }
--
--        /** {@inheritDoc} */
--        @Override public boolean writeTo(ByteBuffer buf, MessageWriter 
writer) {
--            if (buf.remaining() < 33)
--                return false;
--
--            buf.put(HANDSHAKE_MSG_TYPE);
--
--            byte[] bytes = U.uuidToBytes(nodeId);
--
--            assert bytes.length == 16 : bytes.length;
--
--            buf.put(bytes);
--
--            buf.putLong(rcvCnt);
--
--            buf.putLong(connectCnt);
--
--            return true;
--        }
--
--        /** {@inheritDoc} */
--        @Override public boolean readFrom(ByteBuffer buf, MessageReader 
reader) {
--            if (buf.remaining() < 32)
--                return false;
--
--            byte[] nodeIdBytes = new byte[16];
--
--            buf.get(nodeIdBytes);
--
--            nodeId = U.bytesToUuid(nodeIdBytes, 0);
--
--            rcvCnt = buf.getLong();
--
--            connectCnt = buf.getLong();
--
--            return true;
--        }
++        private void cleanupRecovery() {
++            Set<ClientKey> left = null;
  
--        /** {@inheritDoc} */
--        @Override public byte directType() {
--            return HANDSHAKE_MSG_TYPE;
--        }
++            for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> e : 
recoveryDescs.entrySet()) {
++                if (left != null && left.contains(e.getKey()))
++                    continue;
  
--        /** {@inheritDoc} */
--        @Override public byte fieldsCount() {
--            throw new UnsupportedOperationException();
--        }
++                GridNioRecoveryDescriptor recoverySnd = e.getValue();
  
--        /** {@inheritDoc} */
--        @Override public String toString() {
--            return S.toString(HandshakeMessage.class, this);
--        }
--    }
++                if 
(!recoverySnd.nodeAlive(getSpiContext().node(recoverySnd.node().id()))) {
++                    if (left == null)
++                        left = new HashSet<>();
  
--    /**
--     * Recovery acknowledgment message.
--     */
--    @SuppressWarnings("PublicInnerClass")
--    public static class RecoveryLastReceivedMessage implements Message {
--        /** */
--        private static final long serialVersionUID = 0L;
++                    left.add(e.getKey());
++                }
++            }
  
--        /** */
--        private long rcvCnt;
++            if (left != null) {
++                assert !left.isEmpty();
  
--        /**
--         * Default constructor required by {@link Message}.
--         */
--        public RecoveryLastReceivedMessage() {
--            // No-op.
--        }
++                for (ClientKey id : left) {
++                    GridNioRecoveryDescriptor recoverySnd = 
recoveryDescs.remove(id);
  
--        /**
--         * @param rcvCnt Number of received messages.
--         */
--        public RecoveryLastReceivedMessage(long rcvCnt) {
--            this.rcvCnt = rcvCnt;
++                    if (recoverySnd != null)
++                        recoverySnd.onNodeLeft();
++                }
++            }
          }
  
          /**
--         * @return Number of received messages.
++         * @param recoveryDesc Recovery descriptor.
           */
--        public long received() {
--            return rcvCnt;
--        }
--
--        /** {@inheritDoc} */
--        @Override public boolean writeTo(ByteBuffer buf, MessageWriter 
writer) {
--            if (buf.remaining() < 9)
--                return false;
++        private void processRecovery(GridNioRecoveryDescriptor recoveryDesc) {
++            ClusterNode node = recoveryDesc.node();
  
--            buf.put(RECOVERY_LAST_ID_MSG_TYPE);
++            try {
++                if (clients.containsKey(node.id()) ||
++                    !recoveryDesc.nodeAlive(getSpiContext().node(node.id())) 
||
++                    !getSpiContext().pingNode(node.id()))
++                    return;
++            }
++            catch (IgniteClientDisconnectedException e) {
++                if (log.isDebugEnabled())
++                    log.debug("Failed to ping node, client disconnected.");
  
--            buf.putLong(rcvCnt);
++                return;
++            }
  
--            return true;
--        }
++            try {
++                if (log.isDebugEnabled())
++                    log.debug("Recovery reconnect [rmtNode=" + 
recoveryDesc.node().id() + ']');
  
--        /** {@inheritDoc} */
--        @Override public boolean readFrom(ByteBuffer buf, MessageReader 
reader) {
--            if (buf.remaining() < 8)
--                return false;
++                GridCommunicationClient client = reserveClient(node);
  
--            rcvCnt = buf.getLong();
++                client.release();
++            }
++            catch (IgniteCheckedException | IgniteException e) {
++                if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) {
++                    if (log.isDebugEnabled())
++                        log.debug("Recovery reconnect failed, will retry " +
++                            "[rmtNode=" + recoveryDesc.node().id() + ", err=" 
+ e + ']');
  
--            return true;
--        }
++                    addReconnectRequest(recoveryDesc);
++                }
++                else {
++                    if (log.isDebugEnabled())
++                        log.debug("Recovery reconnect failed, " +
++                            "node left [rmtNode=" + recoveryDesc.node().id() 
+ ", err=" + e + ']');
  
--        /** {@inheritDoc} */
--        @Override public byte directType() {
--            return RECOVERY_LAST_ID_MSG_TYPE;
++                    onException("Recovery reconnect failed, node left 
[rmtNode=" + recoveryDesc.node().id() + "]",
++                        e);
++                }
++            }
          }
  
--        /** {@inheritDoc} */
--        @Override public byte fieldsCount() {
--            return 0;
--        }
++        /**
++         * @param recoverySnd Recovery send data.
++         */
++        void addReconnectRequest(GridNioRecoveryDescriptor recoverySnd) {
++            boolean add = q.add(recoverySnd);
  
--        /** {@inheritDoc} */
--        @Override public String toString() {
--            return S.toString(RecoveryLastReceivedMessage.class, this);
++            assert add;
          }
      }
  
      /**
--     * Node ID message.
++     *
       */
--    @SuppressWarnings("PublicInnerClass")
--    public static class NodeIdMessage implements Message {
++    private class HandshakeClosure extends IgniteInClosure2X<InputStream, 
OutputStream> {
          /** */
          private static final long serialVersionUID = 0L;
  
          /** */
--        private byte[] nodeIdBytes;
--
--        /** */
--        private byte[] nodeIdBytesWithType;
--
--        /** */
--        public NodeIdMessage() {
--            // No-op.
--        }
++        private final UUID rmtNodeId;
  
          /**
--         * @param nodeId Node ID.
++         * @param rmtNodeId Remote node ID.
           */
--        private NodeIdMessage(UUID nodeId) {
--            assert nodeId != null;
--
--            nodeIdBytes = U.uuidToBytes(nodeId);
++        private HandshakeClosure(UUID rmtNodeId) {
++            this.rmtNodeId = rmtNodeId;
++        }
  
--            nodeIdBytesWithType = new byte[nodeIdBytes.length + 1];
++        /** {@inheritDoc} */
++        @SuppressWarnings("ThrowFromFinallyBlock")
++        @Override public void applyx(InputStream in, OutputStream out) throws 
IgniteCheckedException {
++            try {
++                // Handshake.
++                byte[] b = new byte[17];
  
--            nodeIdBytesWithType[0] = NODE_ID_MSG_TYPE;
++                int n = 0;
  
--            System.arraycopy(nodeIdBytes, 0, nodeIdBytesWithType, 1, 
nodeIdBytes.length);
--        }
++                while (n < 17) {
++                    int cnt = in.read(b, n, 17 - n);
  
--        /** {@inheritDoc} */
--        @Override public boolean writeTo(ByteBuffer buf, MessageWriter 
writer) {
--            assert nodeIdBytes.length == 16;
++                    if (cnt < 0)
++                        throw new IgniteCheckedException("Failed to get 
remote node ID (end of stream reached)");
  
--            if (buf.remaining() < 17)
--                return false;
++                    n += cnt;
++                }
  
--            buf.put(NODE_ID_MSG_TYPE);
--            buf.put(nodeIdBytes);
++                // First 4 bytes are for length.
++                UUID id = U.bytesToUuid(b, 1);
  
--            return true;
--        }
++                if (!rmtNodeId.equals(id))
++                    throw new IgniteCheckedException("Remote node ID is not 
as expected [expected=" + rmtNodeId +
++                        ", rcvd=" + id + ']');
++                else if (log.isDebugEnabled())
++                    log.debug("Received remote node ID: " + id);
++            }
++            catch (SocketTimeoutException e) {
++                throw new IgniteCheckedException("Failed to perform handshake 
due to timeout (consider increasing " +
++                    "'connectionTimeout' configuration property).", e);
++            }
++            catch (IOException e) {
++                throw new IgniteCheckedException("Failed to perform 
handshake.", e);
++            }
  
--        /** {@inheritDoc} */
--        @Override public boolean readFrom(ByteBuffer buf, MessageReader 
reader) {
--            if (buf.remaining() < 16)
--                return false;
++            try {
++                ClusterNode localNode = getLocalNode();
  
--            nodeIdBytes = new byte[16];
++                if (localNode == null)
++                    throw new IgniteSpiException("Local node has not been 
started or fully initialized " +
++                        "[isStopping=" + getSpiContext().isStopping() + ']');
  
--            buf.get(nodeIdBytes);
++                UUID id = localNode.id();
  
--            return true;
--        }
++                NodeIdMessage msg = new NodeIdMessage(id);
  
--        /** {@inheritDoc} */
--        @Override public byte directType() {
--            return NODE_ID_MSG_TYPE;
--        }
++                out.write(U.IGNITE_HEADER);
++                out.write(NODE_ID_MSG_TYPE);
++                out.write(msg.nodeIdBytes);
  
--        /** {@inheritDoc} */
--        @Override public byte fieldsCount() {
--            return 0;
--        }
++                out.flush();
  
--        /** {@inheritDoc} */
--        @Override public String toString() {
--            return S.toString(NodeIdMessage.class, this);
++                if (log.isDebugEnabled())
++                    log.debug("Sent local node ID [locNodeId=" + id + ", 
rmtNodeId=" + rmtNodeId + ']');
++            }
++            catch (IOException e) {
++                throw new IgniteCheckedException("Failed to perform 
handshake.", e);
++            }
          }
      }
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/d145cb70/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationTemplateTest.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationTemplateTest.java
index d3ba2d6,87a30a6..fa4da5a
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationTemplateTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationTemplateTest.java
@@@ -42,17 -43,17 +43,13 @@@ import static org.apache.ignite.cache.C
   */
  public class IgniteCacheConfigurationTemplateTest extends 
GridCommonAbstractTest {
      /** */
--    private static TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
--
--    /** */
      private static final String TEMPLATE1 = "org.apache.ignite*";
--
      /** */
      private static final String TEMPLATE2 = "org.apache.ignite.test.*";
--
      /** */
      private static final String TEMPLATE3 = "org.apache.ignite.test2.*";
--
++    /** */
++    private static TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
      /** */
      private boolean clientMode;
  
@@@ -429,4 -460,4 +456,4 @@@
              assertNull(ignite.cache(TEMPLATE3));
          }
      }
--}
++}

Reply via email to