This is an automated email from the ASF dual-hosted git repository.

mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 4eeb4591ab6 IGNITE-21266 Java Thin Client: Fixed PA after cluster 
restart with the same topology version. (#11191)
4eeb4591ab6 is described below

commit 4eeb4591ab67bf7c0f9d7c8f738e9a0557ae1561
Author: Mikhail Petrov <32207922+petrov...@users.noreply.github.com>
AuthorDate: Thu Feb 22 12:03:20 2024 +0300

    IGNITE-21266 Java Thin Client: Fixed PA after cluster restart with the same 
topology version. (#11191)
---
 .../client/thin/ClientCacheAffinityContext.java    | 33 ++++++++++++++++++++--
 .../internal/client/thin/ReliableChannel.java      | 19 ++++++++++++-
 ...ientPartitionAwarenessUnstableTopologyTest.java | 27 +++++++++++++-----
 3 files changed, 68 insertions(+), 11 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
index bcd681796da..de42b6532d3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
@@ -28,6 +28,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import org.apache.ignite.IgniteBinary;
 import org.apache.ignite.client.ClientPartitionAwarenessMapper;
 import org.apache.ignite.client.ClientPartitionAwarenessMapperFactory;
@@ -36,6 +37,8 @@ import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
+import static 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
+
 /**
  * Client cache partition awareness context.
  */
@@ -68,13 +71,21 @@ public class ClientCacheAffinityContext {
     /** Caches that have been requested partition mappings for. */
     private volatile CacheMappingRequest rq;
 
+    /** Predicate to check whether a connection to the node with the specified 
ID is open. */
+    private final Predicate<UUID> connectionEstablishedPredicate;
+
     /**
      * @param binary Binary data processor.
      * @param factory Factory for caches with custom affinity.
      */
-    public ClientCacheAffinityContext(IgniteBinary binary, @Nullable 
ClientPartitionAwarenessMapperFactory factory) {
+    public ClientCacheAffinityContext(
+        IgniteBinary binary,
+        @Nullable ClientPartitionAwarenessMapperFactory factory,
+        Predicate<UUID> connectionEstablishedPredicate
+    ) {
         this.paMapFactory = factory;
         this.binary = binary;
+        this.connectionEstablishedPredicate = connectionEstablishedPredicate;
     }
 
     /**
@@ -88,7 +99,7 @@ public class ClientCacheAffinityContext {
         while (true) {
             TopologyNodes lastTop = this.lastTop.get();
 
-            if (lastTop == null || topVer.compareTo(lastTop.topVer) > 0) {
+            if (isTopologyOutdated(lastTop, topVer)) {
                 if (this.lastTop.compareAndSet(lastTop, new 
TopologyNodes(topVer, nodeId)))
                     return true;
             }
@@ -262,7 +273,7 @@ public class ClientCacheAffinityContext {
     protected ClientCacheAffinityMapping currentMapping() {
         TopologyNodes top = lastTop.get();
 
-        if (top == null)
+        if (isTopologyOutdated(top, NONE))
             return null;
 
         ClientCacheAffinityMapping mapping = affinityMapping;
@@ -303,6 +314,22 @@ public class ClientCacheAffinityContext {
         }
     }
 
+    /** */
+    private boolean isTopologyOutdated(TopologyNodes top, 
AffinityTopologyVersion srvSideTopVer) {
+        if (top == null)
+            return true;
+
+        if (srvSideTopVer.compareTo(top.topVer) > 0)
+            return true;
+
+        for (UUID topNode : top.nodes) {
+            if (connectionEstablishedPredicate.test(topNode))
+                return false;
+        }
+
+        return true;
+    }
+
     /**
      * Holder for list of nodes for topology version.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index f88a90e654e..f3ae29c0e1d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -139,7 +139,12 @@ final class ReliableChannel implements AutoCloseable {
 
         partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled();
 
-        affinityCtx = new ClientCacheAffinityContext(binary, 
clientCfg.getPartitionAwarenessMapperFactory());
+        affinityCtx = new ClientCacheAffinityContext(
+            binary,
+            clientCfg.getPartitionAwarenessMapperFactory(),
+            this::isConnectionEstablished
+        );
+
         discoveryCtx = new ClientDiscoveryContext(clientCfg);
 
         connMgr = new GridNioClientConnectionMultiplexer(clientCfg);
@@ -958,6 +963,18 @@ final class ReliableChannel implements AutoCloseable {
         return affinityCtx;
     }
 
+    /** */
+    private boolean isConnectionEstablished(UUID node) {
+        ClientChannelHolder chHolder = nodeChannels.get(node);
+
+        if (chHolder == null || chHolder.isClosed())
+            return false;
+
+        ClientChannel ch = chHolder.ch;
+
+        return ch != null && !ch.closed();
+    }
+
     /**
      * Channels holder.
      */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
index d2ba0e7e9a1..fe49303bbeb 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
@@ -42,6 +42,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import static java.util.stream.IntStream.range;
 import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
 import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -219,12 +220,25 @@ public class 
ThinClientPartitionAwarenessUnstableTopologyTest extends ThinClient
      * Test that partition awareness works when reconnecting to the new 
cluster (with lower topology version)
      */
     @Test
-    public void testPartitionAwarenessOnClusterRestart() throws Exception {
-        startGrids(3);
+    public void 
testPartitionAwarenessOnClusterRestartWithLowerTopologyVersion() throws 
Exception {
+        doPartitionAwarenessOnClusterRestartTest(3, 2);
+    }
+
+    /**
+     * Test that partition awareness works when reconnecting to the new 
cluster (with the same topology version)
+     */
+    @Test
+    public void 
testPartitionAwarenessOnClusterRestartWithSameTopologyVersion() throws 
Exception {
+        doPartitionAwarenessOnClusterRestartTest(3, 3);
+    }
+
+    /** */
+    private void doPartitionAwarenessOnClusterRestartTest(int 
initialClusterSize, int restartedClusterSize) throws Exception {
+        startGrids(initialClusterSize);
 
         awaitPartitionMapExchange();
 
-        initClient(getClientConfiguration(0, 1, 2), 0, 1, 2);
+        initClient(getClientConfiguration(range(0, 
initialClusterSize).toArray()), range(0, initialClusterSize).toArray());
 
         // Test partition awareness before cluster restart.
         testPartitionAwareness(true);
@@ -233,17 +247,16 @@ public class 
ThinClientPartitionAwarenessUnstableTopologyTest extends ThinClient
 
         Arrays.fill(channels, null);
 
-        // Start 2 grids, so topology version of the new cluster will be less 
then old cluster.
-        startGrids(2);
+        startGrids(restartedClusterSize);
 
         awaitPartitionMapExchange();
 
         // Send any request to failover.
-        client.cache(REPL_CACHE_NAME).put(0, 0);
+        client.cache(cacheName).put(0, 0);
 
         detectTopologyChange();
 
-        awaitChannelsInit(0, 1);
+        awaitChannelsInit(range(0, restartedClusterSize).toArray());
 
         testPartitionAwareness(true);
     }

Reply via email to