This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c521dda9 Fix race condition between instance drop and participant 
history update (#2073)
7c521dda9 is described below

commit 7c521dda9ff48f3a6456041d0f0335288e9c8e4f
Author: Qi (Quincy) Qu <[email protected]>
AuthorDate: Tue May 3 14:38:40 2022 -0700

    Fix race condition between instance drop and participant history update 
(#2073)
    
    Fix race condition between instance drop and participant history update
---
 .../dataproviders/BaseControllerDataProvider.java  |  27 +++--
 .../helix/manager/zk/ParticipantManager.java       |  17 +--
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  |   4 +
 .../manager/MockParticipantManager.java            |   4 +-
 .../manager/TestParticipantManager.java            | 131 +++++++++++++++------
 5 files changed, 129 insertions(+), 54 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 70acf31ad..f81f4b424 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -82,8 +82,8 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
       .asList(HelixConstants.ChangeType.EXTERNAL_VIEW,
           HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW);
 
-  private String _clusterName = AbstractDataCache.UNKNOWN_CLUSTER;
-  private String _pipelineName = AbstractDataCache.UNKNOWN_PIPELINE;
+  private final String _clusterName;
+  private final String _pipelineName;
   private String _clusterEventId = AbstractDataCache.UNKNOWN_EVENT_ID;
   private ClusterConfig _clusterConfig;
 
@@ -106,17 +106,17 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
   private final PropertyCache<StateModelDefinition> _stateModelDefinitionCache;
 
   // Special caches
-  private CurrentStateCache _currentStateCache;
+  private final CurrentStateCache _currentStateCache;
   protected TaskCurrentStateCache _taskCurrentStateCache;
-  private InstanceMessagesCache _instanceMessagesCache;
+  private final InstanceMessagesCache _instanceMessagesCache;
 
   // Other miscellaneous caches
   private Map<String, Long> _instanceOfflineTimeMap;
   private Map<String, Map<String, String>> _idealStateRuleMap;
-  private Map<String, Map<String, Set<String>>> 
_disabledInstanceForPartitionMap = new HashMap<>();
-  private Set<String> _disabledInstanceSet = new HashSet<>();
+  private final Map<String, Map<String, Set<String>>> 
_disabledInstanceForPartitionMap = new HashMap<>();
+  private final Set<String> _disabledInstanceSet = new HashSet<>();
   private final Map<String, MonitoredAbnormalResolver> 
_abnormalStateResolverMap = new HashMap<>();
-  private Set<String> _timedOutInstanceDuringMaintenance = new HashSet<>();
+  private final Set<String> _timedOutInstanceDuringMaintenance = new 
HashSet<>();
   private Map<String, LiveInstance> _liveInstanceExcludeTimedOutForMaintenance 
= new HashMap<>();
 
   public BaseControllerDataProvider() {
@@ -777,12 +777,15 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
       PropertyKey propertyKey = keyBuilder.participantHistory(instance);
       ParticipantHistory history = accessor.getProperty(propertyKey);
       if (history == null) {
-        history = new ParticipantHistory(instance);
+        // this means the instance has been removed, skip history update
+        continue;
       }
       if (history.getLastOfflineTime() == ParticipantHistory.ONLINE) {
         history.reportOffline();
-        // persist history back to ZK.
-        if (!accessor.setProperty(propertyKey, history)) {
+        // persist history back to ZK only if the node still exists
+        boolean succeed = accessor.updateProperty(propertyKey,
+            currentData -> (currentData == null) ? null : history.getRecord(), 
history);
+        if (!succeed) {
           LogUtil.logError(logger, getClusterEventId(),
               "Fails to persist participant online history back to ZK!");
         }
@@ -822,6 +825,10 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
       long timeOutWindow) {
     ParticipantHistory history =
         
accessor.getProperty(accessor.keyBuilder().participantHistory(instance));
+    if (history == null) {
+      LogUtil.logWarn(logger, getClusterEventId(), "Participant history is 
null for instance " + instance);
+      return false;
+    }
     List<Long> onlineTimestamps = history.getOnlineTimestampsAsMilliseconds();
     List<Long> offlineTimestamps = 
history.getOfflineTimestampsAsMilliseconds();
 
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 09d994f3a..741e5c4d8 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -351,7 +351,7 @@ public class ParticipantManager {
 
     ParticipantHistory history = getHistory();
     history.reportOnline(_sessionId, _manager.getVersion());
-    persistHistory(history);
+    persistHistory(history, false);
   }
 
   /**
@@ -494,15 +494,16 @@ public class ParticipantManager {
   private ParticipantHistory getHistory() {
     PropertyKey propertyKey = _keyBuilder.participantHistory(_instanceName);
     ParticipantHistory history = _dataAccessor.getProperty(propertyKey);
-    if (history == null) {
-      history = new ParticipantHistory(_instanceName);
-    }
-    return history;
+    return history == null ? new ParticipantHistory(_instanceName) : history;
   }
 
-  private void persistHistory(ParticipantHistory history) {
+  private void persistHistory(ParticipantHistory history, boolean 
skipOnEmptyPath) {
     PropertyKey propertyKey = _keyBuilder.participantHistory(_instanceName);
-    if (!_dataAccessor.setProperty(propertyKey, history)) {
+    boolean result = skipOnEmptyPath
+        ? _dataAccessor.updateProperty(
+            propertyKey, currentData -> (currentData == null) ? null : 
history.getRecord(), history)
+        : _dataAccessor.setProperty(propertyKey, history);
+    if (!result) {
       LOG.error("Failed to persist participant history to zk!");
     }
   }
@@ -514,7 +515,7 @@ public class ParticipantManager {
     try {
       ParticipantHistory history = getHistory();
       history.reportOffline();
-      persistHistory(history);
+      persistHistory(history, true);
     } catch (Exception e) {
       LOG.error("Failed to report participant offline.", e);
     }
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 28a9e4bcc..673536a89 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -205,6 +205,10 @@ public class ZKHelixAdmin implements HelixAdmin {
     _zkClient.createPersistent(PropertyPathBuilder.instanceError(clusterName, 
nodeId), true);
     
_zkClient.createPersistent(PropertyPathBuilder.instanceStatusUpdate(clusterName,
 nodeId), true);
     
_zkClient.createPersistent(PropertyPathBuilder.instanceHistory(clusterName, 
nodeId), true);
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    accessor.setProperty(keyBuilder.participantHistory(nodeId), new 
ParticipantHistory(nodeId));
   }
 
   @Override
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
 
b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index 6485801d5..e15947b61 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -25,8 +25,6 @@ import java.util.concurrent.CountDownLatch;
 import org.apache.helix.HelixCloudProperty;
 import org.apache.helix.InstanceType;
 import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import 
org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
 import 
org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
 import org.apache.helix.mock.participant.MockMSModelFactory;
@@ -39,7 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class MockParticipantManager extends ClusterManager {
-  private static Logger LOG = 
LoggerFactory.getLogger(MockParticipantManager.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(MockParticipantManager.class);
 
   protected int _transDelay = 10;
 
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
 
b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
index 050bc7660..95d1f452c 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
@@ -24,6 +24,9 @@ import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.management.MBeanServer;
@@ -31,6 +34,7 @@ import javax.management.MalformedObjectNameException;
 import javax.management.ObjectInstance;
 import javax.management.ObjectName;
 
+import org.apache.helix.AccessOption;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
@@ -41,6 +45,7 @@ import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.TestHelper;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ParticipantHistory;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.ZkTestHelper;
 import org.apache.helix.common.ZkTestBase;
@@ -65,15 +70,17 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
 public class TestParticipantManager extends ZkTestBase {
-  private MBeanServer _server = ManagementFactory.getPlatformMBeanServer();
-  private String clusterName = TestHelper.getTestClassName();
+  private final MBeanServer _server = 
ManagementFactory.getPlatformMBeanServer();
+  private final String _clusterName = TestHelper.getTestClassName();
+  private final ExecutorService _executor = Executors.newFixedThreadPool(1);
+
   static {
     
System.setProperty(SystemPropertyKeys.STATEUPDATEUTIL_ERROR_PERSISTENCY_ENABLED,
 "true");
   }
 
   @AfterMethod
   public void afterMethod(Method testMethod, ITestContext testContext) {
-    deleteCluster(clusterName);
+    deleteCluster(_clusterName);
     super.endTest(testMethod, testContext);
   }
 
@@ -84,25 +91,24 @@ public class TestParticipantManager extends ZkTestBase {
 
   @Test
   public void simpleIntegrationTest() throws Exception {
-    int n = 1;
-
-    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+    TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
         "TestDB", // resource name prefix
         1, // resources
         4, // partitions per resource
-        n, // number of nodes
+        1, // number of nodes
         1, // replicas
         "MasterSlave", true); // do rebalance
 
+    String instanceName = "localhost_12918";
     HelixManager participant =
-        new ZKHelixManager(clusterName, "localhost_12918", 
InstanceType.PARTICIPANT, ZK_ADDR);
+        new ZKHelixManager(_clusterName, instanceName, 
InstanceType.PARTICIPANT, ZK_ADDR);
     
participant.getStateMachineEngine().registerStateModelFactory("MasterSlave",
         new MockMSModelFactory());
     participant.connect();
 
     HelixManager controller =
-        new ZKHelixManager(clusterName, "controller_0", 
InstanceType.CONTROLLER, ZK_ADDR);
+        new ZKHelixManager(_clusterName, "controller_0", 
InstanceType.CONTROLLER, ZK_ADDR);
     controller.connect();
 
     verifyHelixManagerMetrics(InstanceType.PARTICIPANT, MonitorLevel.DEFAULT,
@@ -111,27 +117,86 @@ public class TestParticipantManager extends ZkTestBase {
         controller.getInstanceName());
 
     BestPossibleExternalViewVerifier verifier =
-        new 
BestPossibleExternalViewVerifier.Builder(clusterName).setZkClient(_gZkClient)
+        new 
BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient)
             
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
             .build();
     Assert.assertTrue(verifier.verifyByPolling());
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(_clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    ParticipantHistory history = 
accessor.getProperty(keyBuilder.participantHistory(instanceName));
+    Assert.assertNotNull(history);
+    long historyModifiedTime = history.getRecord().getModifiedTime();
 
     // cleanup
     controller.disconnect();
     participant.disconnect();
 
     // verify all live-instances and leader nodes are gone
+    
Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance(instanceName)));
+    Assert.assertNull(accessor.getProperty(keyBuilder.controllerLeader()));
+    Assert.assertTrue(
+        historyModifiedTime <
+            
accessor.getProperty(keyBuilder.participantHistory(instanceName)).getRecord().getModifiedTime());
+  }
+
+  @Test(invocationCount = 5)
+  public void testParticipantHistoryWithInstanceDrop() throws Exception {
+    TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        4, // partitions per resource
+        1, // number of nodes
+        1, // replicas
+        "MasterSlave", true); // do rebalance
+
+    String instanceName = "localhost_12918";
+    HelixManager participant =
+        new ZKHelixManager(_clusterName, instanceName, 
InstanceType.PARTICIPANT, ZK_ADDR);
+    
participant.getStateMachineEngine().registerStateModelFactory("MasterSlave",
+        new MockMSModelFactory());
+    participant.connect();
+
+    HelixManager controller =
+        new ZKHelixManager(_clusterName, "controller_0", 
InstanceType.CONTROLLER, ZK_ADDR);
+    controller.connect();
+    BestPossibleExternalViewVerifier verifier =
+        new 
BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient)
+            
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
+            .build();
+    Assert.assertTrue(verifier.verifyByPolling());
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+        new ZKHelixDataAccessor(_clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    
Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance("localhost_12918")));
-    Assert.assertNull(accessor.getProperty(keyBuilder.controllerLeader()));
+    ParticipantHistory history = 
accessor.getProperty(keyBuilder.participantHistory(instanceName));
+    Assert.assertNotNull(history);
+
+    Future instanceDrop = _executor.submit(() -> {
+      boolean succeed = false;
+      while (!succeed) {
+        try {
+          // simulate instance drop
+          succeed = 
_baseAccessor.remove(keyBuilder.instance(instanceName).toString(), 
AccessOption.PERSISTENT);
+        } catch (Exception e) {
+          try {
+            Thread.sleep(100);
+          } catch (Exception ex) { }
+        }
+      }
+    });
+    // cleanup
+    controller.disconnect();
+    participant.disconnect();
+    instanceDrop.get(1000, TimeUnit.MILLISECONDS);
+    // ensure the history node is never created after instance drop
+    
Assert.assertNull(accessor.getProperty(keyBuilder.participantHistory(instanceName)));
   }
 
   @Test
   public void simpleIntegrationTestNeg() throws Exception {
 
-    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+    TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
         "TestDB", // resource name prefix
         1, // resources
@@ -141,19 +206,19 @@ public class TestParticipantManager extends ZkTestBase {
         "MasterSlave", true); // do rebalance
 
     ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
-    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    ClusterConfig clusterConfig = 
configAccessor.getClusterConfig(_clusterName);
     clusterConfig.getRecord()
         
.setListField(ClusterConfig.ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name(),
             new ArrayList<>());
     clusterConfig.setTopologyAwareEnabled(true);
     clusterConfig.setTopology("/Rack/Sub-Rack/Host/Instance");
     clusterConfig.setFaultZoneType("Host");
-    configAccessor.setClusterConfig(clusterName, clusterConfig);
+    configAccessor.setClusterConfig(_clusterName, clusterConfig);
 
 
     String instanceName = "localhost_12918";
     HelixManager participant =
-        new ZKHelixManager(clusterName, instanceName , 
InstanceType.PARTICIPANT, ZK_ADDR);
+        new ZKHelixManager(_clusterName, instanceName , 
InstanceType.PARTICIPANT, ZK_ADDR);
     
participant.getStateMachineEngine().registerStateModelFactory("MasterSlave",
         new MockMSModelFactory());
     // We are expecting an IllegalArgumentException since the domain is not 
set.
@@ -167,16 +232,16 @@ public class TestParticipantManager extends ZkTestBase {
 
     // verify there is no live-instances created
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+        new ZKHelixDataAccessor(_clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    
Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance("localhost_12918")));
+    
Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance(instanceName)));
     Assert.assertNull(accessor.getProperty(keyBuilder.controllerLeader()));
   }
 
   @Test // (dependsOnMethods = "simpleIntegrationTest")
   public void testMonitoringLevel() throws Exception {
     int n = 1;
-    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+    TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
         "TestDB", // resource name prefix
         1, // resources
@@ -189,7 +254,7 @@ public class TestParticipantManager extends ZkTestBase {
     HelixManager participant;
     try {
       participant =
-          new ZKHelixManager(clusterName, "localhost_12918", 
InstanceType.PARTICIPANT, ZK_ADDR);
+          new ZKHelixManager(_clusterName, "localhost_12918", 
InstanceType.PARTICIPANT, ZK_ADDR);
     } finally {
       System.clearProperty(SystemPropertyKeys.MONITOR_LEVEL);
     }
@@ -209,15 +274,15 @@ public class TestParticipantManager extends ZkTestBase {
       String instanceName) throws MalformedObjectNameException {
     // check HelixCallback Monitor
     Set<ObjectInstance> objs =
-        _server.queryMBeans(buildCallbackMonitorObjectName(type, clusterName, 
instanceName), null);
+        _server.queryMBeans(buildCallbackMonitorObjectName(type, _clusterName, 
instanceName), null);
     Assert.assertEquals(objs.size(), 19);
 
     // check HelixZkClient Monitors
     objs =
-        _server.queryMBeans(buildZkClientMonitorObjectName(type, clusterName, 
instanceName), null);
+        _server.queryMBeans(buildZkClientMonitorObjectName(type, _clusterName, 
instanceName), null);
     Assert.assertEquals(objs.size(), 1);
 
-    objs = _server.queryMBeans(buildZkClientPathMonitorObjectName(type, 
clusterName, instanceName),
+    objs = _server.queryMBeans(buildZkClientPathMonitorObjectName(type, 
_clusterName, instanceName),
         null);
 
     int expectedZkPathMonitor;
@@ -262,7 +327,7 @@ public class TestParticipantManager extends ZkTestBase {
 
     MockParticipantManager[] participants = new MockParticipantManager[n];
 
-    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+    TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
         "TestDB", // resource name prefix
         1, // resources
@@ -273,18 +338,18 @@ public class TestParticipantManager extends ZkTestBase {
 
     // start controller
     ClusterControllerManager controller =
-        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+        new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
-      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, 
instanceName);
+      participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, 
instanceName);
       participants[i].syncStart();
     }
 
     BestPossibleExternalViewVerifier verifier =
-        new 
BestPossibleExternalViewVerifier.Builder(clusterName).setZkClient(_gZkClient)
+        new 
BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient)
             
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
             .build();
     Assert.assertTrue(verifier.verifyByPolling());
@@ -339,7 +404,7 @@ public class TestParticipantManager extends ZkTestBase {
 
     MockParticipantManager[] participants = new MockParticipantManager[n];
 
-    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+    TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
         "TestDB", // resource name prefix
         1, // resources
@@ -350,13 +415,13 @@ public class TestParticipantManager extends ZkTestBase {
 
     // start controller
     ClusterControllerManager controller =
-        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+        new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
-      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, 
instanceName);
+      participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, 
instanceName);
       participants[i].setTransition(new 
SessionExpiryTransition(startCountdown, endCountdown));
       participants[i].syncStart();
     }
@@ -367,7 +432,7 @@ public class TestParticipantManager extends ZkTestBase {
     ZkTestHelper.expireSession(participants[0].getZkClient());
 
     BestPossibleExternalViewVerifier verifier =
-        new 
BestPossibleExternalViewVerifier.Builder(clusterName).setZkClient(_gZkClient)
+        new 
BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient)
             
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
             .build();
     Assert.assertTrue(verifier.verifyByPolling());
@@ -376,7 +441,7 @@ public class TestParticipantManager extends ZkTestBase {
     Assert.assertNotSame(newSessionId, oldSessionId);
 
     // assert interrupt exception error in old session
-    String errPath = PropertyPathBuilder.instanceError(clusterName, 
"localhost_12918", oldSessionId,
+    String errPath = PropertyPathBuilder.instanceError(_clusterName, 
"localhost_12918", oldSessionId,
         "TestDB0", "TestDB0_0");
     ZNRecord error = _gZkClient.readData(errPath);
     Assert.assertNotNull(error,

Reply via email to