Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 7f73a2aaa -> c1c644b9c


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c1c644b9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c1c644b9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c1c644b9

Branch: refs/heads/ignite-zk
Commit: c1c644b9cdc51a0e187fa3b5e6ec4610e3235cc0
Parents: 7f73a2a
Author: sboikov <sboi...@gridgain.com>
Authored: Thu Dec 21 13:45:17 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Dec 21 14:11:32 2017 +0300

----------------------------------------------------------------------
 .../internal/ZkDiscoveryNodeJoinEventData.java  |   8 +-
 .../discovery/zk/internal/ZkIgnitePaths.java    |   2 +-
 .../zk/internal/ZkNodeValidateResult.java       |  10 +-
 .../discovery/zk/internal/ZookeeperClient.java  |   3 +-
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 342 +++++++++++++------
 5 files changed, 246 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c644b9/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
index 5967e1c..89f7b42 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
@@ -40,6 +40,9 @@ class ZkDiscoveryNodeJoinEventData extends 
ZkDiscoveryEventData {
     final int dataForJoinedPartCnt;
 
     /** */
+    final int secSubjPartCnt;
+
+    /** */
     final UUID joinDataPrefixId;
 
     /** */
@@ -53,6 +56,7 @@ class ZkDiscoveryNodeJoinEventData extends 
ZkDiscoveryEventData {
      * @param joinDataPrefixId Join data unique prefix.
      * @param joinDataPartCnt Join data part count.
      * @param dataForJoinedPartCnt Data for joined part count.
+     * @param secSubjPartCnt Security subject part count.
      */
     ZkDiscoveryNodeJoinEventData(long evtId,
         long topVer,
@@ -60,7 +64,8 @@ class ZkDiscoveryNodeJoinEventData extends 
ZkDiscoveryEventData {
         int joinedInternalId,
         UUID joinDataPrefixId,
         int joinDataPartCnt,
-        int dataForJoinedPartCnt)
+        int dataForJoinedPartCnt,
+        int secSubjPartCnt)
     {
         super(evtId, EventType.EVT_NODE_JOINED, topVer);
 
@@ -69,6 +74,7 @@ class ZkDiscoveryNodeJoinEventData extends 
ZkDiscoveryEventData {
         this.joinDataPrefixId = joinDataPrefixId;
         this.joinDataPartCnt = joinDataPartCnt;
         this.dataForJoinedPartCnt = dataForJoinedPartCnt;
+        this.secSubjPartCnt = secSubjPartCnt;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c644b9/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
index 818df75..ba3872c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -292,7 +292,7 @@ class ZkIgnitePaths {
      * @param evtId Event ID.
      * @return Event zk path.
      */
-    String joinEventSecuritySubject(long evtId) {
+    String joinEventSecuritySubjectPath(long evtId) {
         return evtsPath + "/s-" + evtId;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c644b9/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java
index 52383d7..2abfee3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.spi.discovery.zk.internal;
 
-import java.io.Serializable;
-
 /**
  *
  */
@@ -27,7 +25,7 @@ class ZkNodeValidateResult {
     String err;
 
     /** */
-    Serializable secSubj;
+    byte[] secSubjZipBytes;
 
     /**
      * @param err Error.
@@ -37,9 +35,9 @@ class ZkNodeValidateResult {
     }
 
     /**
-     * @param secSubj Node security subject.
+     * @param secSubjZipBytes Marshalled security subject.
      */
-    ZkNodeValidateResult(Serializable secSubj) {
-        this.secSubj = secSubj;
+    ZkNodeValidateResult(byte[] secSubjZipBytes) {
+        this.secSubjZipBytes = secSubjZipBytes;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c644b9/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index 5c0bd58..9cd55d4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -284,7 +284,8 @@ public class ZookeeperClient implements Watcher {
     /**
      * @param path Path.
      * @param data Data.
-     * @return {@code True}
+     * @param overhead Extra overhead.
+     * @return {@code True} If data size exceeds max request size and should 
be splitted into multiple parts.
      */
     boolean needSplitNodeData(String path, byte[] data, int overhead) {
         return requestOverhead(path) + data.length + overhead > MAX_REQ_SIZE;

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c644b9/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index f8fe421..6c9a216 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.spi.discovery.zk.internal;
 
+import java.io.ByteArrayInputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.BitSet;
@@ -35,6 +36,10 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.zip.DataFormatException;
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
+import java.util.zip.InflaterInputStream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
@@ -59,6 +64,7 @@ import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteRunnable;
@@ -655,43 +661,6 @@ public class ZookeeperDiscoveryImpl {
         }
     }
 
-    private SecurityCredentials unmarshalCredentials(ZookeeperClusterNode 
node) throws IgniteCheckedException {
-        byte[] credBytes = 
(byte[])node.getAttributes().get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
-
-        if (credBytes == null)
-            return null;
-
-        return U.unmarshal(marsh, credBytes, null);
-    }
-
-    /**
-     * Marshalls credentials with discovery SPI marshaller (will replace 
attribute value).
-     *
-     * @param node Node to marshall credentials for.
-     * @throws IgniteSpiException If marshalling failed.
-     */
-    private void marshalCredentials(ZookeeperClusterNode node) throws 
IgniteSpiException {
-        try {
-            // Use security-unsafe getter.
-            Map<String, Object> attrs0 = node.getAttributes();
-
-            Object creds = 
attrs0.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
-
-            if (creds != null) {
-                Map<String, Object> attrs = new HashMap<>(attrs0);
-
-                assert !(creds instanceof byte[]);
-
-                attrs.put(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS, 
U.marshal(marsh, creds));
-
-                node.setAttributes(attrs);
-            }
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteSpiException("Failed to marshal node security 
credentials: " + node.id(), e);
-        }
-    }
-
     /**
      * @param reconnect  {@code True} if client node reconnects.
      * @param prevJoined {@code True} if reconnect after already joined 
topology
@@ -704,22 +673,10 @@ public class ZookeeperDiscoveryImpl {
         if (internalLsnr != null)
             internalLsnr.beforeJoin(log);
 
-        if (!locNode.isClient()) {
-            DiscoverySpiNodeAuthenticator nodeAuth = spi.getAuthenticator();
-
-            if (nodeAuth != null && nodeAuth.isGlobalNodeAuthentication()) {
-                SecurityCredentials locCred = 
(SecurityCredentials)locNode.getAttributes()
-                    .get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
-
-                localAuthentication(nodeAuth, locCred);
-            }
-        }
-        else {
-            if (reconnect)
-                locNode.setAttributes(spi.getSpiContext().nodeAttributes());
-        }
+        if (locNode.isClient() && reconnect)
+            locNode.setAttributes(spi.getSpiContext().nodeAttributes());
 
-        marshalCredentials(locNode);
+        marshalCredentialsOnJoin(locNode);
 
         rtState = new ZkRuntimeState(prevJoined);
 
@@ -753,37 +710,6 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
-     * Authenticate local node.
-     *
-     * @param nodeAuth Authenticator.
-     * @param locCred Local security credentials for authentication.
-     * @throws IgniteSpiException If any error occurs.
-     */
-    private void localAuthentication(DiscoverySpiNodeAuthenticator nodeAuth, 
SecurityCredentials locCred){
-        assert nodeAuth != null;
-        assert locCred != null;
-
-        try {
-            SecurityContext subj = nodeAuth.authenticateNode(locNode, locCred);
-
-            if (subj == null)
-                throw new IgniteSpiException("Authentication failed for local 
node.");
-
-            if (!(subj instanceof Serializable))
-                throw new IgniteSpiException("Authentication subject is not 
Serializable.");
-
-            Map<String, Object> attrs = new HashMap<>(locNode.attributes());
-
-            attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT_V2, 
U.marshal(marsh, subj));
-
-            locNode.setAttributes(attrs);
-
-        } catch (IgniteException | IgniteCheckedException e) {
-            throw new IgniteSpiException("Failed to authenticate local node 
(will shutdown local node).", e);
-        }
-    }
-
-    /**
      * @throws InterruptedException If interrupted.
      */
     private void initZkNodes() throws InterruptedException {
@@ -859,7 +785,6 @@ public class ZookeeperDiscoveryImpl {
 
             zkClient.deleteIfExistsAsync(path);
         }
-
     }
 
     /**
@@ -1025,6 +950,94 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
+     * Authenticate local node.
+     *
+     * @param nodeAuth Authenticator.
+     * @param locCred Local security credentials for authentication.
+     * @throws IgniteSpiException If any error occurs.
+     */
+    private void localAuthentication(DiscoverySpiNodeAuthenticator nodeAuth, 
SecurityCredentials locCred){
+        assert nodeAuth != null;
+        assert locCred != null;
+
+        try {
+            SecurityContext subj = nodeAuth.authenticateNode(locNode, locCred);
+
+            if (subj == null)
+                throw new IgniteSpiException("Authentication failed for local 
node.");
+
+            if (!(subj instanceof Serializable))
+                throw new IgniteSpiException("Authentication subject is not 
Serializable.");
+
+            Map<String, Object> attrs = new HashMap<>(locNode.attributes());
+
+            attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT_V2, 
U.marshal(marsh, subj));
+
+            locNode.setAttributes(attrs);
+
+        } catch (IgniteException | IgniteCheckedException e) {
+            throw new IgniteSpiException("Failed to authenticate local node 
(will shutdown local node).", e);
+        }
+    }
+
+    /**
+     * @param node Node.
+     * @param zipBytes Zip-compressed marshalled security subject.
+     * @throws Exception If failed.
+     */
+    private void setNodeSecuritySubject(ZookeeperClusterNode node, byte[] 
zipBytes) throws Exception {
+        assert zipBytes != null;
+
+        Map<String, Object> attrs = new HashMap<>(node.getAttributes());
+
+        attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT_V2, 
unzip(zipBytes));
+
+        node.setAttributes(attrs);
+    }
+
+    /**
+     * @param node Node.
+     * @return Credentials.
+     * @throws IgniteCheckedException If failed to unmarshal.
+     */
+    private SecurityCredentials unmarshalCredentials(ZookeeperClusterNode 
node) throws Exception {
+        byte[] credBytes = 
(byte[])node.getAttributes().get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
+
+        if (credBytes == null)
+            return null;
+
+        return unmarshalZip(credBytes);
+    }
+
+    /**
+     * Marshalls credentials with discovery SPI marshaller (will replace 
attribute value).
+     *
+     * @param node Node to marshall credentials for.
+     * @throws IgniteSpiException If marshalling failed.
+     */
+    private void marshalCredentialsOnJoin(ZookeeperClusterNode node) throws 
IgniteSpiException {
+        try {
+            // Use security-unsafe getter.
+            Map<String, Object> attrs0 = node.getAttributes();
+
+            Object creds = 
attrs0.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
+
+            if (creds != null) {
+                Map<String, Object> attrs = new HashMap<>(attrs0);
+
+                assert !(creds instanceof byte[]);
+
+                attrs.put(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS, 
marshalZip(creds));
+
+                node.setAttributes(attrs);
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteSpiException("Failed to marshal node security 
credentials: " + node.id(), e);
+        }
+    }
+
+    /**
      *
      */
     private class CheckJoinStateTimeoutObject extends ZkAbstractWatcher
@@ -1252,16 +1265,14 @@ public class ZookeeperDiscoveryImpl {
 
             DiscoverySpiNodeAuthenticator nodeAuth = spi.getAuthenticator();
 
-            if (nodeAuth != null && !nodeAuth.isGlobalNodeAuthentication()) {
+            if (nodeAuth != null) {
                 try {
                     localAuthentication(nodeAuth, 
unmarshalCredentials(locNode));
                 }
                 catch (Exception e) {
                     U.warn(log, "Local node authentication failed: " + e, e);
 
-                    rtState.onCloseStart(e);
-
-                    joinFut.onDone(e);
+                    onSegmented(e);
 
                     // Stop any further processing.
                     throw new ZookeeperClientFailedException("Local node 
authentication failed: " + e);
@@ -1515,7 +1526,11 @@ public class ZookeeperDiscoveryImpl {
 
                 assert nodeId.equals(joinedNode.id()) : joiningNodeData.node();
 
-                generateNodeJoin(curTop, joiningNodeData, internalId, 
prefixId);
+                generateNodeJoin(curTop,
+                    joiningNodeData,
+                    internalId,
+                    prefixId,
+                    validateRes.secSubjZipBytes);
 
                 watchAliveNodeData(aliveNodePath);
 
@@ -1606,7 +1621,7 @@ public class ZookeeperDiscoveryImpl {
         DiscoverySpiNodeAuthenticator nodeAuth = spi.getAuthenticator();
 
         if (nodeAuth == null)
-            return new ZkNodeValidateResult(null);
+            return new ZkNodeValidateResult((byte[])null);
 
         SecurityCredentials cred;
 
@@ -1640,7 +1655,18 @@ public class ZookeeperDiscoveryImpl {
             return new ZkNodeValidateResult("Authentication subject is not 
serializable");
         }
 
-        return new ZkNodeValidateResult((Serializable)subj);
+        byte[] secSubjZipBytes;
+
+        try {
+            secSubjZipBytes = marshalZip(subj);
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to marshal node security subject: " + e, e);
+
+            return new ZkNodeValidateResult("Failed to marshal node security 
subject");
+        }
+
+        return new ZkNodeValidateResult(secSubjZipBytes);
     }
 
     /**
@@ -1694,13 +1720,15 @@ public class ZookeeperDiscoveryImpl {
      * @param joiningNodeData Join data.
      * @param internalId Joined node internal ID.
      * @param prefixId Unique path prefix.
+     * @param secSubjZipBytes Marshalled security subject.
      * @throws Exception If failed.
      */
     private void generateNodeJoin(
         TreeMap<Long, ZookeeperClusterNode> curTop,
         ZkJoiningNodeData joiningNodeData,
         int internalId,
-        UUID prefixId)
+        UUID prefixId,
+        @Nullable byte[] secSubjZipBytes)
         throws Exception
     {
         ZookeeperClusterNode joinedNode = joiningNodeData.node();
@@ -1741,19 +1769,18 @@ public class ZookeeperDiscoveryImpl {
 
         int overhead = 5;
 
-        String dataPathForJoined = zkPaths.joinEventDataPathForJoined(evtId);
+        int dataForJoinedPartCnt = 
saveData(zkPaths.joinEventDataPathForJoined(evtId),
+            dataForJoinedBytes,
+            overhead);
 
-        int dataForJoinedPartCnt = 1;
+        int secSubjPartCnt = 0;
 
-        if (rtState.zkClient.needSplitNodeData(dataPathForJoined, 
dataForJoinedBytes, overhead)) {
-            dataForJoinedPartCnt = saveMultipleParts(rtState.zkClient,
-                dataPathForJoined,
-                rtState.zkClient.splitNodeData(dataPathForJoined, 
dataForJoinedBytes, overhead));
-        }
-        else {
-            
rtState.zkClient.createIfNeeded(multipartPathName(dataPathForJoined, 0),
-                dataForJoinedBytes,
-                PERSISTENT);
+        if (secSubjZipBytes != null) {
+            secSubjPartCnt = 
saveData(zkPaths.joinEventSecuritySubjectPath(evtId), secSubjZipBytes, 
overhead);
+
+            assert secSubjPartCnt > 0 : secSubjPartCnt;
+
+            setNodeSecuritySubject(joinedNode, secSubjZipBytes);
         }
 
         long addDataTime = System.currentTimeMillis() - addDataStart;
@@ -1765,7 +1792,8 @@ public class ZookeeperDiscoveryImpl {
             joinedNode.internalId(),
             prefixId,
             joiningNodeData.partCount(),
-            dataForJoinedPartCnt);
+            dataForJoinedPartCnt,
+            secSubjPartCnt);
 
         evtData.joiningNodeData = joiningNodeData;
 
@@ -1782,6 +1810,30 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
+     * @param path Path to save.
+     * @param bytes Bytes to save.
+     * @param overhead Extra overhead.
+     * @return Parts count.
+     * @throws Exception If failed.
+     */
+    private int saveData(String path, byte[] bytes, int overhead) throws 
Exception {
+        int dataForJoinedPartCnt = 1;
+
+        if (rtState.zkClient.needSplitNodeData(path, bytes, overhead)) {
+            dataForJoinedPartCnt = saveMultipleParts(rtState.zkClient,
+                path,
+                rtState.zkClient.splitNodeData(path, bytes, overhead));
+        }
+        else {
+            rtState.zkClient.createIfNeeded(multipartPathName(path, 0),
+                bytes,
+                PERSISTENT);
+        }
+
+        return dataForJoinedPartCnt;
+    }
+
+    /**
      * @param locInternalId Local node internal ID.
      * @throws Exception If failed.
      */
@@ -2161,6 +2213,9 @@ public class ZookeeperDiscoveryImpl {
                             exchange.onExchange(dataBag);
                         }
 
+                        if (evtData0.secSubjPartCnt > 0 && 
joiningData.node().attribute(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT_V2) == 
null)
+                            readAndInitSecuritySubject(joiningData.node(), 
evtData0);
+
                         notifyNodeJoin(evtData0, joiningData);
 
                         break;
@@ -2253,6 +2308,21 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
+     * @param node
+     * @param evtData
+     * @throws Exception
+     */
+    private void readAndInitSecuritySubject(ZookeeperClusterNode node, 
ZkDiscoveryNodeJoinEventData evtData) throws Exception {
+        if (evtData.secSubjPartCnt > 0) {
+            byte[] zipBytes = readMultipleParts(rtState.zkClient,
+                zkPaths.joinEventSecuritySubjectPath(evtData.eventId()),
+                evtData.secSubjPartCnt);
+
+            setNodeSecuritySubject(node, zipBytes);
+        }
+    }
+
+    /**
      * @param evtsData Events data.
      * @param evtData Local join event data.
      * @throws Exception If failed.
@@ -2277,6 +2347,8 @@ public class ZookeeperDiscoveryImpl {
         locNode.internalId(evtData.joinedInternalId);
         locNode.order(evtData.topologyVersion());
 
+        readAndInitSecuritySubject(locNode, evtData);
+
         DiscoveryDataBag dataBag = new DiscoveryDataBag(locNode.id());
 
         dataBag.commonData(dataForJoined.discoveryData());
@@ -2317,7 +2389,7 @@ public class ZookeeperDiscoveryImpl {
                 Collections.<Long, Collection<ClusterNode>>emptyMap(),
                 null);
 
-            U.quietAndWarn(log, "Client node was reconnected after it was 
already considered failed.");
+            U.quietAndWarn(log, "Client node was reconnected after it was 
already considered failed [locId=" + locNode.id() + ']');
         }
 
         joinFut.onDone();
@@ -2994,6 +3066,12 @@ public class ZookeeperDiscoveryImpl {
         deleteJoiningNodeData(evtData.nodeId, evtData.joinDataPrefixId, 
evtData.joinDataPartCnt);
 
         deleteDataForJoinedAsync(evtData);
+
+        if (evtData.secSubjPartCnt > 0) {
+            deleteMultiplePartsAsync(rtState.zkClient,
+                zkPaths.joinEventSecuritySubjectPath(evtData.eventId()),
+                evtData.secSubjPartCnt);
+        }
     }
 
     /**
@@ -3188,14 +3266,16 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
-     * @param bytes Bytes.
+     * @param zipBytes Zip-compressed bytes.
      * @return Unmarshalled object.
      * @throws IgniteCheckedException If failed.
      */
-    private <T> T unmarshalZip(byte[] bytes) throws IgniteCheckedException {
-        assert bytes != null && bytes.length > 0;
+    private <T> T unmarshalZip(byte[] zipBytes) throws Exception {
+        assert zipBytes != null && zipBytes.length > 0;
+
+        InflaterInputStream in = new InflaterInputStream(new 
ByteArrayInputStream(zipBytes));
 
-        return U.unmarshalZip(marsh, bytes, 
U.resolveClassLoader(spi.ignite().configuration()));
+        return marsh.unmarshal(in, 
U.resolveClassLoader(spi.ignite().configuration()));
     }
 
     /**
@@ -3206,7 +3286,49 @@ public class ZookeeperDiscoveryImpl {
     byte[] marshalZip(Object obj) throws IgniteCheckedException {
         assert obj != null;
 
-        return U.zip(marsh.marshal(obj));
+        return zip(marsh.marshal(obj));
+    }
+
+    static byte[] zip(byte[] bytes) {
+        Deflater deflater = new Deflater();
+
+        deflater.setInput(bytes);
+        deflater.finish();
+
+        GridByteArrayOutputStream out = new 
GridByteArrayOutputStream(bytes.length);
+
+        final byte[] buf = new byte[bytes.length];
+
+        while (!deflater.finished()) {
+            int cnt = deflater.deflate(buf);
+
+            out.write(buf, 0, cnt);
+        }
+
+        return out.toByteArray();
+    }
+
+    /**
+     * @param zipBytes Zip-compressed bytes.
+     * @return Uncompressed bytes.
+     * @throws DataFormatException If compressed data format is invalid.
+     */
+    public static byte[] unzip(byte[] zipBytes) throws DataFormatException {
+        Inflater inflater = new Inflater();
+
+        inflater.setInput(zipBytes);
+
+        GridByteArrayOutputStream out = new 
GridByteArrayOutputStream(zipBytes.length * 2);
+
+        final byte[] buf = new byte[zipBytes.length];
+
+        while (!inflater.finished()) {
+            int cnt = inflater.inflate(buf);
+
+            out.write(buf, 0, cnt);
+        }
+
+        return out.toByteArray();
     }
 
     /**

Reply via email to