This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d809333838 IGNITE-20963 Fix 
testClientReceivesPartitionAssignmentUpdates flakiness (#2886)
d809333838 is described below

commit d809333838768cb4c7bb2813a4c51ee1ff3f8a1f
Author: Pavel Tupitsyn <ptupit...@apache.org>
AuthorDate: Wed Nov 29 06:35:54 2023 +0200

    IGNITE-20963 Fix testClientReceivesPartitionAssignmentUpdates flakiness 
(#2886)
    
    * Use `partitionAssignmentTimestamp` and `waitForCondition` to wait for 
heartbeat update reliably
    * Fix `FakePlacementDriver` to preserve `leaseStartTime`
    * Fix `PartitionAwarenessTest.initPrimaryReplicas` to use the same 
`leaseStartTime` for all nodes
---
 .../handler/ClientPrimaryReplicaTrackerTest.java   |  9 +++---
 .../ignite/client/handler/FakePlacementDriver.java | 18 +++++------
 .../ignite/internal/client/table/ClientTable.java  |  5 ++-
 .../ignite/client/PartitionAwarenessTest.java      | 36 +++++++++++++++-------
 4 files changed, 42 insertions(+), 26 deletions(-)

diff --git 
a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTrackerTest.java
 
b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTrackerTest.java
index efd27fd0f9..416bde7435 100644
--- 
a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTrackerTest.java
+++ 
b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTrackerTest.java
@@ -28,7 +28,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
 import 
org.apache.ignite.client.handler.ClientPrimaryReplicaTracker.PrimaryReplicasResult;
 import org.apache.ignite.internal.TestHybridClock;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableViewInternal;
@@ -51,7 +50,7 @@ class ClientPrimaryReplicaTrackerTest extends 
BaseIgniteAbstractTest {
     @BeforeEach
     public void setUp() throws Exception {
         driver = new FakePlacementDriver(PARTITIONS);
-        driver.setReplicas(List.of("s1", "s2"), TABLE_ID);
+        driver.setReplicas(List.of("s1", "s2"), TABLE_ID, 1);
 
         InternalTable internalTable = mock(InternalTable.class);
         when(internalTable.partitions()).thenReturn(PARTITIONS);
@@ -89,7 +88,7 @@ class ClientPrimaryReplicaTrackerTest extends 
BaseIgniteAbstractTest {
         assertEquals(1, tracker.maxStartTime());
         driver.updateReplica("s3", TABLE_ID, 0, 2);
 
-        assertEquals(new HybridTimestamp(2, 0).longValue(), 
tracker.maxStartTime());
+        assertEquals(2, tracker.maxStartTime());
 
         PrimaryReplicasResult replicas = 
tracker.primaryReplicasAsync(TABLE_ID, null).join();
         assertEquals(PARTITIONS, replicas.nodeNames().size());
@@ -105,7 +104,7 @@ class ClientPrimaryReplicaTrackerTest extends 
BaseIgniteAbstractTest {
         assertEquals(1, tracker.maxStartTime());
         driver.updateReplica(null, TABLE_ID, 1, 2);
 
-        assertEquals(new HybridTimestamp(2, 0).longValue(), 
tracker.maxStartTime());
+        assertEquals(2, tracker.maxStartTime());
 
         PrimaryReplicasResult replicas = 
tracker.primaryReplicasAsync(TABLE_ID, null).join();
         assertEquals(PARTITIONS, replicas.nodeNames().size());
@@ -138,7 +137,7 @@ class ClientPrimaryReplicaTrackerTest extends 
BaseIgniteAbstractTest {
         driver.updateReplica("update-3", TABLE_ID, 0, 15);
         driver.updateReplica("old-update-4", TABLE_ID, 0, 14);
 
-        assertEquals(new HybridTimestamp(15, 0).longValue(), 
tracker.maxStartTime());
+        assertEquals(15, tracker.maxStartTime());
 
         PrimaryReplicasResult replicas = 
tracker.primaryReplicasAsync(TABLE_ID, null).join();
         assertEquals(PARTITIONS, replicas.nodeNames().size());
diff --git 
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
 
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
index 2a349d10d0..a43b052ee0 100644
--- 
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
+++ 
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
@@ -38,13 +38,13 @@ public class FakePlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
         implements PlacementDriver {
     private final int partitions;
 
-    private final List<String> primaryReplicas;
+    private final List<ReplicaMeta> primaryReplicas;
 
     private boolean returnError;
 
     public FakePlacementDriver(int partitions) {
         this.partitions = partitions;
-        primaryReplicas = new ArrayList<>(Collections.nCopies(partitions, 
"s"));
+        primaryReplicas = new ArrayList<>(Collections.nCopies(partitions, 
getReplicaMeta("s", HybridTimestamp.MIN_VALUE.longValue())));
     }
 
     public void returnError(boolean returnError) {
@@ -54,12 +54,12 @@ public class FakePlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
     /**
      * Sets all primary replicas.
      */
-    public void setReplicas(List<String> replicas, int tableId) {
+    public void setReplicas(List<String> replicas, int tableId, long 
leaseStartTime) {
         assert replicas.size() == partitions;
 
         for (int partition = 0; partition < replicas.size(); partition++) {
             String replica = replicas.get(partition);
-            updateReplica(replica, tableId, partition, 
System.currentTimeMillis());
+            updateReplica(replica, tableId, partition, leaseStartTime);
         }
     }
 
@@ -67,11 +67,11 @@ public class FakePlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
      * Sets primary replica for the given partition.
      */
     public void updateReplica(String replica, int tableId, int partition, long 
leaseStartTime) {
-        primaryReplicas.set(partition, replica);
+        primaryReplicas.set(partition, getReplicaMeta(replica, 
leaseStartTime));
         TablePartitionId groupId = new TablePartitionId(tableId, partition);
 
         PrimaryReplicaEventParameters params = new 
PrimaryReplicaEventParameters(
-                0, groupId, replica, new HybridTimestamp(leaseStartTime, 0));
+                0, groupId, replica, 
HybridTimestamp.hybridTimestamp(leaseStartTime));
 
         fireEvent(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, params);
     }
@@ -83,7 +83,7 @@ public class FakePlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
 
         return returnError
                 ? CompletableFuture.failedFuture(new 
RuntimeException("FakePlacementDriver expected error"))
-                : 
CompletableFuture.completedFuture(getReplicaMeta(primaryReplicas.get(id.partitionId())));
+                : 
CompletableFuture.completedFuture(primaryReplicas.get(id.partitionId()));
     }
 
     @Override
@@ -96,7 +96,7 @@ public class FakePlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
         return CompletableFuture.completedFuture(null);
     }
 
-    private static ReplicaMeta getReplicaMeta(String leaseholder) {
+    private static ReplicaMeta getReplicaMeta(String leaseholder, long 
leaseStartTime) {
         //noinspection serial
         return new ReplicaMeta() {
             @Override
@@ -111,7 +111,7 @@ public class FakePlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
 
             @Override
             public HybridTimestamp getStartTime() {
-                return HybridTimestamp.MIN_VALUE;
+                return HybridTimestamp.hybridTimestamp(leaseStartTime);
             }
 
             @Override
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index 1ee0508999..51ffb9e35a 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -551,7 +551,10 @@ public class ClientTable implements Table {
                         }
 
                         // Returned timestamp can be newer than requested.
-                        partitionAssignment.timestamp = r.in().unpackLong();
+                        long ts = r.in().unpackLong();
+                        assert ts >= timestamp : "Returned timestamp is older 
than requested: " + ts + " < " + timestamp;
+
+                        partitionAssignment.timestamp = ts;
 
                         List<String> res = new ArrayList<>(cnt);
                         for (int i = 0; i < cnt; i++) {
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
 
b/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
index 052913d118..b0af679313 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.client;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -35,7 +37,9 @@ import org.apache.ignite.client.fakes.FakeIgniteTables;
 import org.apache.ignite.client.fakes.FakeInternalTable;
 import org.apache.ignite.client.handler.FakePlacementDriver;
 import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.internal.client.ReliableChannel;
 import org.apache.ignite.internal.client.tx.ClientTransaction;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -71,9 +75,9 @@ public class PartitionAwarenessTest extends 
AbstractClientTest {
 
     private static IgniteClient client2;
 
-    private @Nullable String lastOp;
+    private volatile @Nullable String lastOp;
 
-    private @Nullable String lastOpServerName;
+    private volatile @Nullable String lastOpServerName;
 
     private static final AtomicInteger nextTableId = new AtomicInteger(101);
 
@@ -166,6 +170,8 @@ public class PartitionAwarenessTest extends 
AbstractClientTest {
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     public void testClientReceivesPartitionAssignmentUpdates(boolean 
useHeartbeat) throws InterruptedException {
+        ReliableChannel ch = IgniteTestUtils.getFieldValue(client2, "ch");
+
         // Check default assignment.
         RecordView<Tuple> recordView = defaultTable().recordView();
 
@@ -173,19 +179,25 @@ public class PartitionAwarenessTest extends 
AbstractClientTest {
         assertOpOnNode(nodeKey2, "get", x -> recordView.get(null, 
Tuple.create().set("ID", 2L)));
 
         // Update partition assignment.
+        var oldTs = ch.partitionAssignmentTimestamp();
         initPrimaryReplicas(reversedReplicas());
 
         if (useHeartbeat) {
             // Wait for heartbeat message to receive change notification flag.
-            Thread.sleep(500);
+            assertTrue(IgniteTestUtils.waitForCondition(() -> 
ch.partitionAssignmentTimestamp() > oldTs, 3000));
         } else {
-            // Perform a request on the default channel to receive change 
notification flag.
-            // Use two requests because of round-robin.
-            client2.tables().tables();
-            client2.tables().tables();
+            // Perform requests to receive change notification flag.
+            int maxRequests = 50;
+            while (ch.partitionAssignmentTimestamp() <= oldTs && maxRequests-- 
> 0) {
+                client2.tables().tables();
+            }
+
+            assertThat("Failed to receive assignment update", maxRequests, 
greaterThan(0));
         }
 
         // Check new assignment.
+        assertThat(ch.partitionAssignmentTimestamp(), greaterThan(oldTs));
+
         assertOpOnNode(nodeKey2, "get", x -> recordView.get(null, 
Tuple.create().set("ID", 1L)));
         assertOpOnNode(nodeKey1, "get", x -> recordView.get(null, 
Tuple.create().set("ID", 2L)));
     }
@@ -611,16 +623,18 @@ public class PartitionAwarenessTest extends 
AbstractClientTest {
     }
 
     private static void initPrimaryReplicas(@Nullable List<String> replicas) {
-        initPrimaryReplicas(testServer.placementDriver(), replicas);
-        initPrimaryReplicas(testServer2.placementDriver(), replicas);
+        long leaseStartTime = new HybridClockImpl().nowLong();
+
+        initPrimaryReplicas(testServer.placementDriver(), replicas, 
leaseStartTime);
+        initPrimaryReplicas(testServer2.placementDriver(), replicas, 
leaseStartTime);
     }
 
-    private static void initPrimaryReplicas(FakePlacementDriver 
placementDriver, @Nullable List<String> replicas) {
+    private static void initPrimaryReplicas(FakePlacementDriver 
placementDriver, @Nullable List<String> replicas, long leaseStartTime) {
         if (replicas == null) {
             replicas = defaultReplicas();
         }
 
-        placementDriver.setReplicas(replicas, nextTableId.get() - 1);
+        placementDriver.setReplicas(replicas, nextTableId.get() - 1, 
leaseStartTime);
     }
 
     private static List<String> defaultReplicas() {

Reply via email to