This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new a77e4e54c Refactorings for helix-view-aggregator test (#2494)
a77e4e54c is described below
commit a77e4e54c809a69ef1762e307de76037475583ce
Author: limbooverlambda <[email protected]>
AuthorDate: Tue May 23 17:26:46 2023 -0700
Refactorings for helix-view-aggregator test (#2494)
Co-authored-by: Supratik Chakraborty <[email protected]>
---
.../view/integration/TestHelixViewAggregator.java | 264 +++++++++++++--------
1 file changed, 162 insertions(+), 102 deletions(-)
diff --git
a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java
b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java
index 96890463a..d1b3bb55d 100644
---
a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java
+++
b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java
@@ -24,16 +24,21 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
import org.apache.helix.api.config.ViewClusterSourceConfig;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.Message;
import org.apache.helix.participant.statemachine.StateModelParser;
@@ -57,7 +62,7 @@ public class TestHelixViewAggregator extends
ViewAggregatorIntegrationTestBase {
private ConfigAccessor _configAccessor;
private HelixAdmin _helixAdmin;
private MockViewClusterSpectator _monitor;
- private Set<String> _allResources = new HashSet<>();
+ private final Set<String> _allResources = new HashSet<>();
// TODO: add test coverage on multiple statemodel instances for different
view clusters
private DistViewAggregatorStateModel _viewAggregatorStateModel;
@@ -91,26 +96,6 @@ public class TestHelixViewAggregator extends
ViewAggregatorIntegrationTestBase {
triggerViewAggregatorStateTransition("OFFLINE", "STANDBY");
}
- private void triggerViewAggregatorStateTransition(String fromState, String
toState)
- throws Exception {
- if
(!_viewAggregatorStateModel.getCurrentState().equalsIgnoreCase(fromState)) {
- throw new IllegalStateException(String
- .format("From state (%s) != current state (%s).", fromState,
- _viewAggregatorStateModel.getCurrentState()));
- } else if
(_viewAggregatorStateModel.getCurrentState().equalsIgnoreCase(toState)) {
- return;
- }
- NotificationContext context = new NotificationContext(null);
- Message msg = new Message(Message.MessageType.STATE_TRANSITION, "msgId");
- msg.setPartitionName(viewClusterName);
- msg.setFromState(fromState);
- msg.setToState(toState);
- Method method =
stateModelParser.getMethodForTransition(_viewAggregatorStateModel.getClass(),
- fromState, toState, new Class[] { Message.class,
NotificationContext.class });
- method.invoke(_viewAggregatorStateModel, msg, context);
- _viewAggregatorStateModel.updateState(toState);
- }
-
@AfterClass
public void afterClass() throws Exception {
_monitor.shutdown();
@@ -119,82 +104,60 @@ public class TestHelixViewAggregator extends
ViewAggregatorIntegrationTestBase {
@Test
public void testHelixViewAggregator() throws Exception {
- // Clean up initial events
- _monitor.reset();
+ initiateViewAggregator();
+ createResourceAndTriggerRebalance();
+ removeResourceFromCluster();
+ modifyViewClusterConfig();
+ simulateViewAggregatorServiceCrashRestart();
+ stopViewAggregator();
+ }
+ @Test(dependsOnMethods = "testHelixViewAggregator")
+ public void testRemoteDataRemovalAndRefresh() throws Exception {
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(viewClusterName,
_baseAccessor);
// Start view aggregator
triggerViewAggregatorStateTransition("STANDBY", "LEADER");
-
// Wait for refresh and verify
Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
- verifyViewClusterEventChanges(false, true, true);
- Set<String> allParticipantNames = new HashSet<>();
- for (MockParticipantManager participant : _allParticipants) {
- allParticipantNames.add(participant.getInstanceName());
- }
- Assert.assertEquals(
- new
HashSet<>(_monitor.getPropertyNamesFromViewCluster(PropertyType.LIVEINSTANCES)),
- allParticipantNames);
- Assert.assertEquals(
- new
HashSet<>(_monitor.getPropertyNamesFromViewCluster(PropertyType.INSTANCES)),
- allParticipantNames);
- _monitor.reset();
-
- // Create resource and trigger rebalance
- createResources();
- rebalanceResources();
-
- // Wait for refresh and verify
+ // remove live instances from view cluster zk data, wait for next refresh
trigger
+
Assert.assertTrue(accessor.removeProperty(accessor.keyBuilder().liveInstances()));
Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
- verifyViewClusterEventChanges(true, false, false);
- Assert.assertEquals(
- new
HashSet<>(_monitor.getPropertyNamesFromViewCluster(PropertyType.EXTERNALVIEW)),
- _allResources);
- _monitor.reset();
-
- // Remove 1 resource from a cluster, we should get corresponding changes
in view cluster
- List<String> resourceNameList = new ArrayList<>(_allResources);
- _gSetupTool.dropResourceFromCluster(_allSourceClusters.get(0),
resourceNameList.get(0));
- rebalanceResources();
+
Assert.assertTrue(accessor.getChildNames(accessor.keyBuilder().liveInstances()).size()
> 0);
- // Wait for refresh and verify
+
Assert.assertTrue(accessor.removeProperty(accessor.keyBuilder().externalViews()));
Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
- verifyViewClusterEventChanges(true, false, false);
- Assert.assertEquals(
- new
HashSet<>(_monitor.getPropertyNamesFromViewCluster(PropertyType.EXTERNALVIEW)),
_allResources);
- _monitor.reset();
-
- // Modify view cluster config
- _viewClusterRefreshPeriodSec = 8;
- List<PropertyType> newProperties =
- new ArrayList<>(ViewClusterSourceConfig.getValidPropertyTypes());
- newProperties.remove(PropertyType.LIVEINSTANCES);
- resetViewClusterConfig(_viewClusterRefreshPeriodSec, newProperties);
+
Assert.assertTrue(accessor.getChildNames(accessor.keyBuilder().externalViews()).size()
> 0);
+ // Stop view aggregator
+ stopViewAggregator();
+ }
- // Wait for refresh and verify
- Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
- verifyViewClusterEventChanges(false, false, true);
-
Assert.assertEquals(_monitor.getPropertyNamesFromViewCluster(PropertyType.LIVEINSTANCES).size(),
0);
- _monitor.reset();
+ private void stopViewAggregator() throws Exception {
+ triggerViewAggregatorStateTransition("LEADER", "STANDBY");
+ }
+ private void simulateViewAggregatorServiceCrashRestart() throws Exception {
// Simulate view aggregator service crashed and got reset
- triggerViewAggregatorStateTransition("LEADER", "STANDBY");
- _viewAggregatorStateModel
- .rollbackOnError(new Message(Message.MessageType.STATE_TRANSITION,
"test"),
- new NotificationContext(null), null);
+ stopViewAggregator();
+ _viewAggregatorStateModel.rollbackOnError(
+ new Message(Message.MessageType.STATE_TRANSITION, "test"), new
NotificationContext(null),
+ null);
_viewAggregatorStateModel.updateState("ERROR");
triggerViewAggregatorStateTransition("ERROR", "OFFLINE");
// Change happened during view aggregator down
- newProperties = new
ArrayList<>(ViewClusterSourceConfig.getValidPropertyTypes());
+ List<PropertyType> newProperties =
+ new ArrayList<>(ViewClusterSourceConfig.getValidPropertyTypes());
newProperties.remove(PropertyType.INSTANCES);
resetViewClusterConfig(_viewClusterRefreshPeriodSec, newProperties);
MockParticipantManager participant = _allParticipants.get(0);
participant.syncStop();
_helixAdmin.enableInstance(participant.getClusterName(),
participant.getInstanceName(), false);
- _gSetupTool.dropInstanceFromCluster(participant.getClusterName(),
participant.getInstanceName());
+ _gSetupTool.dropInstanceFromCluster(participant.getClusterName(),
+ participant.getInstanceName());
rebalanceResources();
+
+ Set<String> allParticipantNames = getParticipantInstanceNames();
allParticipantNames.remove(participant.getInstanceName());
// Restart helix view aggregator
@@ -202,8 +165,11 @@ public class TestHelixViewAggregator extends
ViewAggregatorIntegrationTestBase {
triggerViewAggregatorStateTransition("STANDBY", "LEADER");
// Wait for refresh and verify
- Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
- verifyViewClusterEventChanges(true, true, true);
+ Predicate<MockViewClusterSpectator> checkForAllChanges =
+
hasExternalViewChanges().and(hasInstanceConfigChanges()).and(hasLiveInstanceChanges());
+ int timeout = (_viewClusterRefreshPeriodSec + 10) * 1000;
+ TestHelper.verify(() -> checkForAllChanges.test(_monitor), timeout);
+
Assert.assertEquals(
new
HashSet<>(_monitor.getPropertyNamesFromViewCluster(PropertyType.EXTERNALVIEW)),
_allResources);
@@ -211,28 +177,96 @@ public class TestHelixViewAggregator extends
ViewAggregatorIntegrationTestBase {
new
HashSet<>(_monitor.getPropertyNamesFromViewCluster(PropertyType.LIVEINSTANCES)),
allParticipantNames);
Assert.assertEquals(_monitor.getPropertyNamesFromViewCluster(PropertyType.INSTANCES).size(),
0);
+ }
- // Stop view aggregator
- triggerViewAggregatorStateTransition("LEADER", "STANDBY");
+ private void modifyViewClusterConfig() throws Exception {
+ try {
+ // Modify view cluster config
+ _viewClusterRefreshPeriodSec = 8;
+ List<PropertyType> newProperties = new
ArrayList<>(ViewClusterSourceConfig.getValidPropertyTypes());
+ newProperties.remove(PropertyType.LIVEINSTANCES);
+ resetViewClusterConfig(_viewClusterRefreshPeriodSec, newProperties);
+
+ // Wait for refresh and verify
+ Predicate<MockViewClusterSpectator> checkForLiveInstanceChanges =
+
hasExternalViewChanges().negate().and(hasInstanceConfigChanges().negate()).and(hasLiveInstanceChanges());
+ int timeout = (_viewClusterRefreshPeriodSec + 10) * 1000;
+ TestHelper.verify(() -> checkForLiveInstanceChanges.test(_monitor),
timeout);
+
Assert.assertEquals(_monitor.getPropertyNamesFromViewCluster(PropertyType.LIVEINSTANCES).size(),
+ 0);
+ } finally {
+ _monitor.reset();
+ }
}
- @Test(dependsOnMethods = "testHelixViewAggregator")
- public void testRemoteDataRemovalAndRefresh() throws Exception {
- HelixDataAccessor accessor = new ZKHelixDataAccessor(viewClusterName,
_baseAccessor);
- // Start view aggregator
- triggerViewAggregatorStateTransition("STANDBY", "LEADER");
- // Wait for refresh and verify
- Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
- // remove live instances from view cluster zk data, wait for next refresh
trigger
-
Assert.assertTrue(accessor.removeProperty(accessor.keyBuilder().liveInstances()));
- Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
-
Assert.assertTrue(accessor.getChildNames(accessor.keyBuilder().liveInstances()).size()
> 0);
+ private void removeResourceFromCluster() throws Exception {
+ try {
+ // Remove 1 resource from a cluster, we should get corresponding changes
in view cluster
+ List<String> resourceNameList = new ArrayList<>(_allResources);
+ _gSetupTool.dropResourceFromCluster(_allSourceClusters.get(0),
resourceNameList.get(0));
+ rebalanceResources();
+ Predicate<MockViewClusterSpectator> checkForExternalViewChanges =
+
hasExternalViewChanges().and(hasInstanceConfigChanges().negate()).and(hasLiveInstanceChanges().negate());
+ // Wait for refresh and verify
+ int timeout = (_viewClusterRefreshPeriodSec + 5) * 1000;
+ TestHelper.verify(() -> checkForExternalViewChanges.test(_monitor),
timeout);
+ Assert.assertEquals(new
HashSet<>(_monitor.getPropertyNamesFromViewCluster(PropertyType.EXTERNALVIEW)),
+ _allResources);
+ } finally {
+ _monitor.reset();
+ }
+ }
-
Assert.assertTrue(accessor.removeProperty(accessor.keyBuilder().externalViews()));
- Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
-
Assert.assertTrue(accessor.getChildNames(accessor.keyBuilder().externalViews()).size()
> 0);
- // Stop view aggregator
- triggerViewAggregatorStateTransition("LEADER", "STANDBY");
+ private void createResourceAndTriggerRebalance() throws Exception {
+ try {
+ // Create resource and trigger rebalance
+ createResources();
+ rebalanceResources();
+ // Wait for refresh and verify
+ Predicate<MockViewClusterSpectator> checkForExternalViewChanges =
+ hasExternalViewChanges().and(hasInstanceConfigChanges().negate())
+ .and(hasLiveInstanceChanges().negate());
+ int timeout = (_viewClusterRefreshPeriodSec + 10) * 1000;
+ TestHelper.verify(() -> checkForExternalViewChanges.test(_monitor),
timeout);
+ Assert.assertEquals(
+ new
HashSet<>(_monitor.getPropertyNamesFromViewCluster(PropertyType.EXTERNALVIEW)),
+ _allResources);
+ } finally {
+ _monitor.reset();
+ }
+ }
+
+ private void initiateViewAggregator() throws Exception {
+ try {
+ // Clean up initial events
+ _monitor.reset();
+
+ // Start view aggregator
+ triggerViewAggregatorStateTransition("STANDBY", "LEADER");
+
+ // Wait for refresh and verify
+ Predicate<MockViewClusterSpectator> checkForNoExternalViewChanges =
+ hasExternalViewChanges().negate().and(hasInstanceConfigChanges())
+ .and(hasLiveInstanceChanges());
+ int timeout = (_viewClusterRefreshPeriodSec + 5) * 1000;
+ TestHelper.verify(() -> checkForNoExternalViewChanges.test(_monitor),
timeout);
+
+ Set<String> participantInstanceNames = getParticipantInstanceNames();
+
+ Assert.assertEquals(
+ new
HashSet<>(_monitor.getPropertyNamesFromViewCluster(PropertyType.LIVEINSTANCES)),
+ participantInstanceNames);
+ Assert.assertEquals(
+ new
HashSet<>(_monitor.getPropertyNamesFromViewCluster(PropertyType.INSTANCES)),
+ participantInstanceNames);
+ } finally {
+ _monitor.reset();
+ }
+ }
+
+ private Set<String> getParticipantInstanceNames() {
+ return _allParticipants.stream().map(ZKHelixManager::getInstanceName)
+ .collect(Collectors.toSet());
}
private void resetViewClusterConfig(int refreshPeriod, List<PropertyType>
properties) {
@@ -247,11 +281,16 @@ public class TestHelixViewAggregator extends
ViewAggregatorIntegrationTestBase {
_configAccessor.setClusterConfig(viewClusterName, viewClusterConfig);
}
- private void verifyViewClusterEventChanges(boolean externalViewChange,
- boolean instanceConfigChange, boolean liveInstancesChange) {
- Assert.assertEquals(_monitor.getExternalViewChangeCount() > 0,
externalViewChange);
- Assert.assertEquals(_monitor.getInstanceConfigChangeCount() > 0,
instanceConfigChange);
- Assert.assertEquals(_monitor.getLiveInstanceChangeCount() > 0,
liveInstancesChange);
+ private Predicate<MockViewClusterSpectator> hasExternalViewChanges() {
+ return clusterSpectator -> clusterSpectator.getExternalViewChangeCount() >
0;
+ }
+
+ private Predicate<MockViewClusterSpectator> hasInstanceConfigChanges() {
+ return clusterSpectator -> clusterSpectator.getInstanceConfigChangeCount()
> 0;
+ }
+
+ private Predicate<MockViewClusterSpectator> hasLiveInstanceChanges() {
+ return clusterSpectator -> clusterSpectator.getLiveInstanceChangeCount() >
0;
}
/**
@@ -279,8 +318,8 @@ public class TestHelixViewAggregator extends
ViewAggregatorIntegrationTestBase {
// We always rebalance all resources, even if it would be deleted
during test
// We assume rebalance will be successful
try {
- _gSetupTool
- .rebalanceResource(sourceClusterName, resourceName,
numReplicaPerResourcePartition);
+ _gSetupTool.rebalanceResource(sourceClusterName, resourceName,
+ numReplicaPerResourcePartition);
} catch (HelixException e) {
// ok
}
@@ -288,6 +327,27 @@ public class TestHelixViewAggregator extends
ViewAggregatorIntegrationTestBase {
}
}
+ private void triggerViewAggregatorStateTransition(String fromState, String
toState)
+ throws Exception {
+ if
(!_viewAggregatorStateModel.getCurrentState().equalsIgnoreCase(fromState)) {
+ throw new IllegalStateException(
+ String.format("From state (%s) != current state (%s).", fromState,
+ _viewAggregatorStateModel.getCurrentState()));
+ } else if
(_viewAggregatorStateModel.getCurrentState().equalsIgnoreCase(toState)) {
+ return;
+ }
+ NotificationContext context = new NotificationContext(null);
+ Message msg = new Message(Message.MessageType.STATE_TRANSITION, "msgId");
+ msg.setPartitionName(viewClusterName);
+ msg.setFromState(fromState);
+ msg.setToState(toState);
+ Method method =
+
stateModelParser.getMethodForTransition(_viewAggregatorStateModel.getClass(),
fromState,
+ toState, new Class[]{Message.class, NotificationContext.class});
+ method.invoke(_viewAggregatorStateModel, msg, context);
+ _viewAggregatorStateModel.updateState(toState);
+ }
+
@Override
protected int getNumSourceCluster() {
return numSourceCluster;