This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a5b3483a83 [Flaky-test] Fix 
LLCRealtimeClusterIntegrationTest.testReset (#11806)
a5b3483a83 is described below

commit a5b3483a838839e606b251f2bb6e0d0b22f75a93
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Sun Oct 15 22:14:34 2023 -0700

    [Flaky-test] Fix LLCRealtimeClusterIntegrationTest.testReset (#11806)
---
 .../tests/BaseClusterIntegrationTestSet.java       | 63 ++++++++++++----------
 .../tests/LLCRealtimeClusterIntegrationTest.java   |  5 +-
 .../tests/OfflineClusterIntegrationTest.java       |  5 +-
 3 files changed, 40 insertions(+), 33 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
index 1d2a75215f..9ce9c69959 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
@@ -23,14 +23,16 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.Message;
 import org.apache.pinot.client.ResultSet;
 import org.apache.pinot.client.ResultSetGroup;
 import org.apache.pinot.common.exception.QueryException;
@@ -605,40 +607,47 @@ public abstract class BaseClusterIntegrationTestSet 
extends BaseClusterIntegrati
     }, 60_000L, errorMessage);
   }
 
-  public void testReset(TableType tableType)
-      throws Exception {
+  public void testReset(TableType tableType) {
     String rawTableName = getTableName();
+    String tableNameWithType = 
TableNameBuilder.forType(tableType).tableNameWithType(rawTableName);
 
-    // reset the table.
-    resetTable(rawTableName, tableType, null);
-
-    // wait for all live messages clear the queue.
-    List<String> instances = 
_helixResourceManager.getServerInstancesForTable(rawTableName, tableType);
-    PropertyKey.Builder keyBuilder = _helixDataAccessor.keyBuilder();
+    // Reset the table.
+    // NOTE: Reset table might fail if there are pending messages, so we need 
to retry until it succeeds.
     TestUtils.waitForCondition(aVoid -> {
-      int liveMessageCount = 0;
-      for (String instanceName : instances) {
-        List<Message> messages = 
_helixDataAccessor.getChildValues(keyBuilder.messages(instanceName), true);
-        liveMessageCount += messages.size();
+      try {
+        resetTable(rawTableName, tableType, null);
+        return true;
+      } catch (IOException e) {
+        assertTrue(e.toString().contains("pending message"), "Got unexpected 
exception: " + e);
+        return false;
       }
-      return liveMessageCount == 0;
-    }, 30_000L, "Failed to wait for all segment reset messages clear helix 
state transition!");
-
-    // Check that all segment states come back to ONLINE.
+    }, 30_000L, "Failed to reset table: " + tableNameWithType);
+
+    // Wait for all the reset messages being processed.
+    IdealState idealState = 
_helixResourceManager.getTableIdealState(tableNameWithType);
+    assertNotNull(idealState, "Failed to find ideal state for table: " + 
tableNameWithType);
+    Set<String> instances = new HashSet<>();
+    for (Map<String, String> instanceStateMap : 
idealState.getRecord().getMapFields().values()) {
+      instances.addAll(instanceStateMap.keySet());
+    }
+    PropertyKey.Builder keyBuilder = _helixDataAccessor.keyBuilder();
     TestUtils.waitForCondition(aVoid -> {
-      // check external view and wait for everything to come back online
-      ExternalView externalView = 
_helixAdmin.getResourceExternalView(getHelixClusterName(),
-          TableNameBuilder.forType(tableType).tableNameWithType(rawTableName));
-      for (Map<String, String> externalViewStateMap : 
externalView.getRecord().getMapFields().values()) {
-        for (String state : externalViewStateMap.values()) {
-          if 
(!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)
-              && 
!CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING.equals(state)) {
-            return false;
-          }
+      for (String instanceName : instances) {
+        if 
(!_helixDataAccessor.getChildNames(keyBuilder.messages(instanceName)).isEmpty())
 {
+          return false;
         }
       }
       return true;
-    }, 30_000L, "Failed to wait for all segments come back online");
+    }, 30_000L, "Failed to process all the reset messages");
+
+    // Wait for external view converging with ideal state.
+    TestUtils.waitForCondition(aVoid -> {
+      IdealState is = 
_helixResourceManager.getTableIdealState(tableNameWithType);
+      assertNotNull(is, "Failed to find ideal state for table: " + 
tableNameWithType);
+      ExternalView ev = 
_helixResourceManager.getTableExternalView(tableNameWithType);
+      assertNotNull(ev, "Failed to find external view for table: " + 
tableNameWithType);
+      return 
ev.getRecord().getMapFields().equals(is.getRecord().getMapFields());
+    }, 30_000L, "Failed to match the ideal state");
   }
 
   public String reloadTableAndValidateResponse(String tableName, TableType 
tableType, boolean forceDownload)
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 65d2adb9b3..51b19993f1 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -313,9 +313,8 @@ public class LLCRealtimeClusterIntegrationTest extends 
BaseRealtimeClusterIntegr
   }
 
   @Test
-  public void testReset()
-      throws Exception {
-    super.testReset(TableType.REALTIME);
+  public void testReset() {
+    testReset(TableType.REALTIME);
   }
 
   @Test
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index a5057d44ef..b129783170 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -2972,9 +2972,8 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
   }
 
   @Test
-  public void testReset()
-      throws Exception {
-    super.testReset(TableType.OFFLINE);
+  public void testReset() {
+    testReset(TableType.OFFLINE);
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to