This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new a77e4e54c Refactorings for helix-view-aggregator test (#2494)
a77e4e54c is described below

commit a77e4e54c809a69ef1762e307de76037475583ce
Author: limbooverlambda <[email protected]>
AuthorDate: Tue May 23 17:26:46 2023 -0700

    Refactorings for helix-view-aggregator test (#2494)
    
    Co-authored-by: Supratik Chakraborty <[email protected]>
---
 .../view/integration/TestHelixViewAggregator.java  | 264 +++++++++++++--------
 1 file changed, 162 insertions(+), 102 deletions(-)

diff --git 
a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java
 
b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java
index 96890463a..d1b3bb55d 100644
--- 
a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java
+++ 
b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java
@@ -24,16 +24,21 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
 import org.apache.helix.api.config.ViewClusterSourceConfig;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.Message;
 import org.apache.helix.participant.statemachine.StateModelParser;
@@ -57,7 +62,7 @@ public class TestHelixViewAggregator extends 
ViewAggregatorIntegrationTestBase {
   private ConfigAccessor _configAccessor;
   private HelixAdmin _helixAdmin;
   private MockViewClusterSpectator _monitor;
-  private Set<String> _allResources = new HashSet<>();
+  private final Set<String> _allResources = new HashSet<>();
   // TODO: add test coverage on multiple statemodel instances for different 
view clusters
   private DistViewAggregatorStateModel _viewAggregatorStateModel;
 
@@ -91,26 +96,6 @@ public class TestHelixViewAggregator extends 
ViewAggregatorIntegrationTestBase {
     triggerViewAggregatorStateTransition("OFFLINE", "STANDBY");
   }
 
-  private void triggerViewAggregatorStateTransition(String fromState, String 
toState)
-      throws Exception {
-    if 
(!_viewAggregatorStateModel.getCurrentState().equalsIgnoreCase(fromState)) {
-      throw new IllegalStateException(String
-          .format("From state (%s) != current state (%s).", fromState,
-              _viewAggregatorStateModel.getCurrentState()));
-    } else if 
(_viewAggregatorStateModel.getCurrentState().equalsIgnoreCase(toState)) {
-      return;
-    }
-    NotificationContext context = new NotificationContext(null);
-    Message msg = new Message(Message.MessageType.STATE_TRANSITION, "msgId");
-    msg.setPartitionName(viewClusterName);
-    msg.setFromState(fromState);
-    msg.setToState(toState);
-    Method method = 
stateModelParser.getMethodForTransition(_viewAggregatorStateModel.getClass(),
-        fromState, toState, new Class[] { Message.class, 
NotificationContext.class });
-    method.invoke(_viewAggregatorStateModel, msg, context);
-    _viewAggregatorStateModel.updateState(toState);
-  }
-
   @AfterClass
   public void afterClass() throws Exception {
     _monitor.shutdown();
@@ -119,82 +104,60 @@ public class TestHelixViewAggregator extends 
ViewAggregatorIntegrationTestBase {
 
   @Test
   public void testHelixViewAggregator() throws Exception {
-    // Clean up initial events
-    _monitor.reset();
+    initiateViewAggregator();
+    createResourceAndTriggerRebalance();
+    removeResourceFromCluster();
+    modifyViewClusterConfig();
+    simulateViewAggregatorServiceCrashRestart();
+    stopViewAggregator();
+  }
 
+  @Test(dependsOnMethods = "testHelixViewAggregator")
+  public void testRemoteDataRemovalAndRefresh() throws Exception {
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(viewClusterName, 
_baseAccessor);
     // Start view aggregator
     triggerViewAggregatorStateTransition("STANDBY", "LEADER");
-
     // Wait for refresh and verify
     Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
-    verifyViewClusterEventChanges(false, true, true);
-    Set<String> allParticipantNames = new HashSet<>();
-    for (MockParticipantManager participant : _allParticipants) {
-      allParticipantNames.add(participant.getInstanceName());
-    }
-    Assert.assertEquals(
-        new 
HashSet<>(_monitor.getPropertyNamesFromViewCluster(PropertyType.LIVEINSTANCES)),
-        allParticipantNames);
-    Assert.assertEquals(
-        new 
HashSet<>(_monitor.getPropertyNamesFromViewCluster(PropertyType.INSTANCES)),
-        allParticipantNames);
-    _monitor.reset();
-
-    // Create resource and trigger rebalance
-    createResources();
-    rebalanceResources();
-
-    // Wait for refresh and verify
+    // remove live instances from view cluster zk data, wait for next refresh 
trigger
+    
Assert.assertTrue(accessor.removeProperty(accessor.keyBuilder().liveInstances()));
     Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
-    verifyViewClusterEventChanges(true, false, false);
-    Assert.assertEquals(
-        new 
HashSet<>(_monitor.getPropertyNamesFromViewCluster(PropertyType.EXTERNALVIEW)),
-        _allResources);
-    _monitor.reset();
-
-    // Remove 1 resource from a cluster, we should get corresponding changes 
in view cluster
-    List<String> resourceNameList = new ArrayList<>(_allResources);
-    _gSetupTool.dropResourceFromCluster(_allSourceClusters.get(0), 
resourceNameList.get(0));
-    rebalanceResources();
+    
Assert.assertTrue(accessor.getChildNames(accessor.keyBuilder().liveInstances()).size()
 > 0);
 
-    // Wait for refresh and verify
+    
Assert.assertTrue(accessor.removeProperty(accessor.keyBuilder().externalViews()));
     Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
-    verifyViewClusterEventChanges(true, false, false);
-    Assert.assertEquals(
-        new 
HashSet<>(_monitor.getPropertyNamesFromViewCluster(PropertyType.EXTERNALVIEW)), 
_allResources);
-    _monitor.reset();
-
-    // Modify view cluster config
-    _viewClusterRefreshPeriodSec = 8;
-    List<PropertyType> newProperties =
-        new ArrayList<>(ViewClusterSourceConfig.getValidPropertyTypes());
-    newProperties.remove(PropertyType.LIVEINSTANCES);
-    resetViewClusterConfig(_viewClusterRefreshPeriodSec, newProperties);
+    
Assert.assertTrue(accessor.getChildNames(accessor.keyBuilder().externalViews()).size()
 > 0);
+    // Stop view aggregator
+    stopViewAggregator();
+  }
 
-    // Wait for refresh and verify
-    Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
-    verifyViewClusterEventChanges(false, false, true);
-    
Assert.assertEquals(_monitor.getPropertyNamesFromViewCluster(PropertyType.LIVEINSTANCES).size(),
 0);
-    _monitor.reset();
+  private void stopViewAggregator() throws Exception {
+    triggerViewAggregatorStateTransition("LEADER", "STANDBY");
+  }
 
+  private void simulateViewAggregatorServiceCrashRestart() throws Exception {
     // Simulate view aggregator service crashed and got reset
-    triggerViewAggregatorStateTransition("LEADER", "STANDBY");
-    _viewAggregatorStateModel
-        .rollbackOnError(new Message(Message.MessageType.STATE_TRANSITION, 
"test"),
-            new NotificationContext(null), null);
+    stopViewAggregator();
+    _viewAggregatorStateModel.rollbackOnError(
+        new Message(Message.MessageType.STATE_TRANSITION, "test"), new 
NotificationContext(null),
+        null);
     _viewAggregatorStateModel.updateState("ERROR");
     triggerViewAggregatorStateTransition("ERROR", "OFFLINE");
 
     // Change happened during view aggregator down
-    newProperties = new 
ArrayList<>(ViewClusterSourceConfig.getValidPropertyTypes());
+    List<PropertyType> newProperties =
+        new ArrayList<>(ViewClusterSourceConfig.getValidPropertyTypes());
     newProperties.remove(PropertyType.INSTANCES);
     resetViewClusterConfig(_viewClusterRefreshPeriodSec, newProperties);
     MockParticipantManager participant = _allParticipants.get(0);
 
     participant.syncStop();
     _helixAdmin.enableInstance(participant.getClusterName(), 
participant.getInstanceName(), false);
-    _gSetupTool.dropInstanceFromCluster(participant.getClusterName(), 
participant.getInstanceName());
+    _gSetupTool.dropInstanceFromCluster(participant.getClusterName(),
+        participant.getInstanceName());
     rebalanceResources();
+
+    Set<String> allParticipantNames = getParticipantInstanceNames();
     allParticipantNames.remove(participant.getInstanceName());
 
     // Restart helix view aggregator
@@ -202,8 +165,11 @@ public class TestHelixViewAggregator extends 
ViewAggregatorIntegrationTestBase {
     triggerViewAggregatorStateTransition("STANDBY", "LEADER");
 
     // Wait for refresh and verify
-    Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
-    verifyViewClusterEventChanges(true, true, true);
+    Predicate<MockViewClusterSpectator> checkForAllChanges =
+        
hasExternalViewChanges().and(hasInstanceConfigChanges()).and(hasLiveInstanceChanges());
+    int timeout = (_viewClusterRefreshPeriodSec + 10) * 1000;
+    TestHelper.verify(() -> checkForAllChanges.test(_monitor), timeout);
+
     Assert.assertEquals(
         new 
HashSet<>(_monitor.getPropertyNamesFromViewCluster(PropertyType.EXTERNALVIEW)),
         _allResources);
@@ -211,28 +177,96 @@ public class TestHelixViewAggregator extends 
ViewAggregatorIntegrationTestBase {
         new 
HashSet<>(_monitor.getPropertyNamesFromViewCluster(PropertyType.LIVEINSTANCES)),
         allParticipantNames);
     
Assert.assertEquals(_monitor.getPropertyNamesFromViewCluster(PropertyType.INSTANCES).size(),
 0);
+  }
 
-    // Stop view aggregator
-    triggerViewAggregatorStateTransition("LEADER", "STANDBY");
+  private void modifyViewClusterConfig() throws Exception {
+    try {
+      // Modify view cluster config
+      _viewClusterRefreshPeriodSec = 8;
+      List<PropertyType> newProperties = new 
ArrayList<>(ViewClusterSourceConfig.getValidPropertyTypes());
+      newProperties.remove(PropertyType.LIVEINSTANCES);
+      resetViewClusterConfig(_viewClusterRefreshPeriodSec, newProperties);
+
+      // Wait for refresh and verify
+      Predicate<MockViewClusterSpectator> checkForLiveInstanceChanges =
+          
hasExternalViewChanges().negate().and(hasInstanceConfigChanges().negate()).and(hasLiveInstanceChanges());
+      int timeout = (_viewClusterRefreshPeriodSec + 10) * 1000;
+      TestHelper.verify(() -> checkForLiveInstanceChanges.test(_monitor), 
timeout);
+      
Assert.assertEquals(_monitor.getPropertyNamesFromViewCluster(PropertyType.LIVEINSTANCES).size(),
+          0);
+    } finally {
+      _monitor.reset();
+    }
   }
 
-  @Test(dependsOnMethods = "testHelixViewAggregator")
-  public void testRemoteDataRemovalAndRefresh() throws Exception {
-    HelixDataAccessor accessor = new ZKHelixDataAccessor(viewClusterName, 
_baseAccessor);
-    // Start view aggregator
-    triggerViewAggregatorStateTransition("STANDBY", "LEADER");
-    // Wait for refresh and verify
-    Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
-    // remove live instances from view cluster zk data, wait for next refresh 
trigger
-    
Assert.assertTrue(accessor.removeProperty(accessor.keyBuilder().liveInstances()));
-    Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
-    
Assert.assertTrue(accessor.getChildNames(accessor.keyBuilder().liveInstances()).size()
 > 0);
+  private void removeResourceFromCluster() throws Exception {
+    try {
+      // Remove 1 resource from a cluster, we should get corresponding changes 
in view cluster
+      List<String> resourceNameList = new ArrayList<>(_allResources);
+      _gSetupTool.dropResourceFromCluster(_allSourceClusters.get(0), 
resourceNameList.get(0));
+      rebalanceResources();
+      Predicate<MockViewClusterSpectator> checkForExternalViewChanges =
+          
hasExternalViewChanges().and(hasInstanceConfigChanges().negate()).and(hasLiveInstanceChanges().negate());
+      // Wait for refresh and verify
+      int timeout = (_viewClusterRefreshPeriodSec + 5) * 1000;
+      TestHelper.verify(() -> checkForExternalViewChanges.test(_monitor), 
timeout);
+      Assert.assertEquals(new 
HashSet<>(_monitor.getPropertyNamesFromViewCluster(PropertyType.EXTERNALVIEW)),
+          _allResources);
+    } finally {
+      _monitor.reset();
+    }
+  }
 
-    
Assert.assertTrue(accessor.removeProperty(accessor.keyBuilder().externalViews()));
-    Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
-    
Assert.assertTrue(accessor.getChildNames(accessor.keyBuilder().externalViews()).size()
 > 0);
-    // Stop view aggregator
-    triggerViewAggregatorStateTransition("LEADER", "STANDBY");
+  private void createResourceAndTriggerRebalance() throws Exception {
+    try {
+      // Create resource and trigger rebalance
+      createResources();
+      rebalanceResources();
+      // Wait for refresh and verify
+      Predicate<MockViewClusterSpectator> checkForExternalViewChanges =
+          hasExternalViewChanges().and(hasInstanceConfigChanges().negate())
+              .and(hasLiveInstanceChanges().negate());
+      int timeout = (_viewClusterRefreshPeriodSec + 10) * 1000;
+      TestHelper.verify(() -> checkForExternalViewChanges.test(_monitor), 
timeout);
+      Assert.assertEquals(
+          new 
HashSet<>(_monitor.getPropertyNamesFromViewCluster(PropertyType.EXTERNALVIEW)),
+          _allResources);
+    } finally {
+      _monitor.reset();
+    }
+  }
+
+  private void initiateViewAggregator() throws Exception {
+    try {
+      // Clean up initial events
+      _monitor.reset();
+
+      // Start view aggregator
+      triggerViewAggregatorStateTransition("STANDBY", "LEADER");
+
+      // Wait for refresh and verify
+      Predicate<MockViewClusterSpectator> checkForNoExternalViewChanges =
+          hasExternalViewChanges().negate().and(hasInstanceConfigChanges())
+              .and(hasLiveInstanceChanges());
+      int timeout = (_viewClusterRefreshPeriodSec + 5) * 1000;
+      TestHelper.verify(() -> checkForNoExternalViewChanges.test(_monitor), 
timeout);
+
+      Set<String> participantInstanceNames = getParticipantInstanceNames();
+
+      Assert.assertEquals(
+          new 
HashSet<>(_monitor.getPropertyNamesFromViewCluster(PropertyType.LIVEINSTANCES)),
+          participantInstanceNames);
+      Assert.assertEquals(
+          new 
HashSet<>(_monitor.getPropertyNamesFromViewCluster(PropertyType.INSTANCES)),
+          participantInstanceNames);
+    } finally {
+      _monitor.reset();
+    }
+  }
+
+  private Set<String> getParticipantInstanceNames() {
+    return _allParticipants.stream().map(ZKHelixManager::getInstanceName)
+        .collect(Collectors.toSet());
   }
 
   private void resetViewClusterConfig(int refreshPeriod, List<PropertyType> 
properties) {
@@ -247,11 +281,16 @@ public class TestHelixViewAggregator extends 
ViewAggregatorIntegrationTestBase {
     _configAccessor.setClusterConfig(viewClusterName, viewClusterConfig);
   }
 
-  private void verifyViewClusterEventChanges(boolean externalViewChange,
-      boolean instanceConfigChange, boolean liveInstancesChange) {
-    Assert.assertEquals(_monitor.getExternalViewChangeCount() > 0, 
externalViewChange);
-    Assert.assertEquals(_monitor.getInstanceConfigChangeCount() > 0, 
instanceConfigChange);
-    Assert.assertEquals(_monitor.getLiveInstanceChangeCount() > 0, 
liveInstancesChange);
+  private Predicate<MockViewClusterSpectator> hasExternalViewChanges() {
+    return clusterSpectator -> clusterSpectator.getExternalViewChangeCount() > 
0;
+  }
+
+  private Predicate<MockViewClusterSpectator> hasInstanceConfigChanges() {
+    return clusterSpectator -> clusterSpectator.getInstanceConfigChangeCount() 
> 0;
+  }
+
+  private Predicate<MockViewClusterSpectator> hasLiveInstanceChanges() {
+    return clusterSpectator -> clusterSpectator.getLiveInstanceChangeCount() > 
0;
   }
 
   /**
@@ -279,8 +318,8 @@ public class TestHelixViewAggregator extends 
ViewAggregatorIntegrationTestBase {
         // We always rebalance all resources, even if it would be deleted 
during test
         // We assume rebalance will be successful
         try {
-          _gSetupTool
-              .rebalanceResource(sourceClusterName, resourceName, 
numReplicaPerResourcePartition);
+          _gSetupTool.rebalanceResource(sourceClusterName, resourceName,
+              numReplicaPerResourcePartition);
         } catch (HelixException e) {
           // ok
         }
@@ -288,6 +327,27 @@ public class TestHelixViewAggregator extends 
ViewAggregatorIntegrationTestBase {
     }
   }
 
+  private void triggerViewAggregatorStateTransition(String fromState, String 
toState)
+      throws Exception {
+    if 
(!_viewAggregatorStateModel.getCurrentState().equalsIgnoreCase(fromState)) {
+      throw new IllegalStateException(
+          String.format("From state (%s) != current state (%s).", fromState,
+              _viewAggregatorStateModel.getCurrentState()));
+    } else if 
(_viewAggregatorStateModel.getCurrentState().equalsIgnoreCase(toState)) {
+      return;
+    }
+    NotificationContext context = new NotificationContext(null);
+    Message msg = new Message(Message.MessageType.STATE_TRANSITION, "msgId");
+    msg.setPartitionName(viewClusterName);
+    msg.setFromState(fromState);
+    msg.setToState(toState);
+    Method method =
+        
stateModelParser.getMethodForTransition(_viewAggregatorStateModel.getClass(), 
fromState,
+            toState, new Class[]{Message.class, NotificationContext.class});
+    method.invoke(_viewAggregatorStateModel, msg, context);
+    _viewAggregatorStateModel.updateState(toState);
+  }
+
   @Override
   protected int getNumSourceCluster() {
     return numSourceCluster;

Reply via email to