jiajunwang commented on a change in pull request #904: Fix the concurrent 
modification error happens during the HelixManager initHandlers() call
URL: https://github.com/apache/helix/pull/904#discussion_r395946837
 
 

 ##########
 File path: 
helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleSession.java
 ##########
 @@ -466,6 +468,79 @@ public void testSessionExpiredWhenResetHandlers() throws 
Exception {
     deleteCluster(clusterName);
   }
 
+  @Test
+  public void testConcurrentInitCallbackHandlers() throws Exception {
+    final String clusterName =
+        CLUSTER_PREFIX + "_" + _className + "_" + 
TestHelper.getTestMethodName();
+    TestHelper.setupEmptyCluster(_gZkClient, clusterName);
+    final String spectatorName = "testConcurrentHandlerChangeSpectator";
+    try {
+      BlockingHandleNewSessionZkHelixManager helixManager =
+          new BlockingHandleNewSessionZkHelixManager(clusterName, 
spectatorName,
+              InstanceType.SPECTATOR, _gZkClient.getServers());
+      helixManager.connect();
+      // Add two mock listeners that will add more callback handlers while 
handling INIT or CALLBACK event.
+      // Note that we have to test with 2 separate listeners so one of them 
has a chance to fail if
+      // there is a concurrent modification exception.
+      helixManager.addLiveInstanceChangeListener(
+          (LiveInstanceChangeListener) (liveInstances, changeContext) -> {
+            if (changeContext.getType() != NotificationContext.Type.FINALIZE) {
+              for (LiveInstance liveInstance : liveInstances) {
+                if (liveInstance.getInstanceName().equals("localhost_1")) {
+                  try {
+                    helixManager.addCurrentStateChangeListener(
+                        (CurrentStateChangeListener) (instanceName, 
statesInfo, currentStateChangeContext) -> {
+                          // empty callback
+                        }, liveInstance.getInstanceName(), 
liveInstance.getEphemeralOwner());
+                  } catch (Exception e) {
+                    throw new HelixException(
+                        "Unexpected exception in the 
testConcurrentHandlerProcessing.", e);
+                  }
+                }
+              }
+            }
+          });
+      helixManager.addLiveInstanceChangeListener(
+          (LiveInstanceChangeListener) (liveInstances, changeContext) -> {
+            if (changeContext.getType() != NotificationContext.Type.FINALIZE) {
+              for (LiveInstance liveInstance : liveInstances) {
+                if (liveInstance.getInstanceName().equals("localhost_2")) {
+                  try {
+                    helixManager.addCurrentStateChangeListener(
+                        (CurrentStateChangeListener) (instanceName, 
statesInfo, currentStateChangeContext) -> {
+                          // empty callback
+                        }, liveInstance.getInstanceName(), 
liveInstance.getEphemeralOwner());
+                  } catch (Exception e) {
+                    throw new HelixException(
+                        "Unexpected exception in the 
testConcurrentHandlerProcessing.", e);
+                  }
+                }
+              }
+            }
+          });
+      // Session expire will trigger all callbacks to be init. And the 
injected liveInstance
+      // listener will trigger more callbackhandlers to be registered during 
the init process.
+      ZkTestHelper.asyncExpireSession(helixManager.getZkClient());
+      // Create mock live instance znodes to trigger the internal callback 
handling logic which will
+      // modify the handler list.
+      setupLiveInstances(clusterName, new int[] { 1, 2 });
+      // Start new session handling so the manager will call the initHandler() 
for initializing all
+      // existing handlers.
+      helixManager.proceedNewSessionHandling();
+      // Ensure the new session has been processed.
+      TestHelper.verify(() -> helixManager.getHandleNewSessionEndTime() != 0, 
3000);
+      // Verify that both new mock current state callback handlers have been 
initialized normally.
+      // Note that if there is concurrent modification that cause errors, one 
of the callback will
+      // not be initialized normally.
+      for (CallbackHandler handler : helixManager.getHandlers()) {
+        Assert.assertTrue(handler.isReady(),
+            "CallbackHandler is not initialized as expected. It might be 
caused by a ConcurrentModificationException");
 
 Review comment:
   This is done by the auto format.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org

Reply via email to