Updated Branches: refs/heads/master 86f63b9e4 -> 5d3924106
HELIX-134: refactor helix manager to handle zk session expiry more reliably. add more tests for new cluster manager impl Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/5d392410 Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/5d392410 Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/5d392410 Branch: refs/heads/master Commit: 5d39241061b5c5207cb0bc4177e1a8f725c598f8 Parents: 86f63b9 Author: zzhang <[email protected]> Authored: Wed Jul 31 11:42:28 2013 -0700 Committer: zzhang <[email protected]> Committed: Wed Jul 31 11:42:28 2013 -0700 ---------------------------------------------------------------------- .../manager/ClusterControllerManager.java | 103 ++++ .../manager/MockParticipantManager.java | 10 +- .../manager/TestAbstractManager.java | 43 -- .../manager/TestZkCallbackHandlerLeak.java | 592 +++++++++++++++++++ .../integration/manager/ZkTestManager.java | 36 ++ 5 files changed, 740 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d392410/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java new file mode 100644 index 0000000..ef2d013 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java @@ -0,0 +1,103 @@ +package org.apache.helix.integration.manager; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import org.apache.helix.manager.zk.CallbackHandler; +import org.apache.helix.manager.zk.ControllerManager; +import org.apache.helix.manager.zk.ZkClient; +import org.apache.log4j.Logger; + +public class ClusterControllerManager extends ControllerManager implements Runnable, ZkTestManager +{ + private static Logger LOG = Logger.getLogger(ClusterControllerManager.class); + + private final CountDownLatch _startCountDown = new CountDownLatch(1); + private final CountDownLatch _stopCountDown = new CountDownLatch(1); + private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1); + + public ClusterControllerManager(String zkAddr, String clusterName, String controllerName) + { + super(zkAddr, clusterName, controllerName); + } + + public void syncStop() + { + _stopCountDown.countDown(); + try + { + _waitStopFinishCountDown.await(); + } + catch (InterruptedException e) + { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + public void syncStart() + { + // TODO: prevent start multiple times + new Thread(this).start(); + try + { + _startCountDown.await(); + } + catch (InterruptedException e) + { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + @Override + public void run() + { + try + { + connect(); + _startCountDown.countDown(); + _stopCountDown.await(); + } + catch (Exception e) + { + LOG.error("exception running controller-manager", e); + } + finally + { + _startCountDown.countDown(); + disconnect(); + _waitStopFinishCountDown.countDown(); + } + } + + @Override + public ZkClient getZkClient() { + return _zkclient; + } + + @Override + public List<CallbackHandler> getHandlers() { + return _handlers; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d392410/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java index e171539..dccad56 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java @@ -19,8 +19,10 @@ package org.apache.helix.integration.manager; * under the License. */ +import java.util.List; import java.util.concurrent.CountDownLatch; +import org.apache.helix.manager.zk.CallbackHandler; import org.apache.helix.manager.zk.ParticipantManager; import org.apache.helix.manager.zk.ZkClient; import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory; @@ -32,7 +34,7 @@ import org.apache.helix.participant.StateMachineEngine; import org.apache.log4j.Logger; -public class MockParticipantManager extends ParticipantManager implements Runnable +public class MockParticipantManager extends ParticipantManager implements Runnable, ZkTestManager { private static Logger LOG = Logger.getLogger(MockParticipantManager.class); private final String _instanceName; @@ -133,7 +135,13 @@ public class MockParticipantManager extends ParticipantManager implements Runnab } } + @Override public ZkClient getZkClient() { return _zkclient; } + + @Override + public List<CallbackHandler> getHandlers() { + return _handlers; + } } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d392410/helix-core/src/test/java/org/apache/helix/integration/manager/TestAbstractManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestAbstractManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestAbstractManager.java deleted file mode 100644 index 74774d1..0000000 --- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestAbstractManager.java +++ /dev/null @@ -1,43 +0,0 @@ -package org.apache.helix.integration.manager; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.List; - -import org.apache.helix.manager.zk.CallbackHandler; -import org.apache.helix.manager.zk.ParticipantManager; -import org.apache.helix.manager.zk.ZkClient; - -// ZkHelixManager used for test only. expose more class members -public class TestAbstractManager extends ParticipantManager { - - public TestAbstractManager(String zkConnectString, String clusterName, - String instanceName) throws Exception { - super(zkConnectString, clusterName, instanceName); - } - - public ZkClient getZkClient() { - return _zkclient; - } - - public List<CallbackHandler> getHandlers() { - return _handlers; - } -} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d392410/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java new file mode 100644 index 0000000..9ace407 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java @@ -0,0 +1,592 @@ +package org.apache.helix.integration.manager; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.I0Itec.zkclient.IZkChildListener; +import org.I0Itec.zkclient.IZkDataListener; +import org.apache.helix.CurrentStateChangeListener; +import org.apache.helix.NotificationContext; +import org.apache.helix.PropertyKey; +import org.apache.helix.TestHelper; +import org.apache.helix.ZkTestHelper; +import org.apache.helix.ZkUnitTestBase; +import org.apache.helix.manager.zk.CallbackHandler; +import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.model.CurrentState; +import org.apache.helix.tools.ClusterStateVerifier; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestZkCallbackHandlerLeak extends ZkUnitTestBase +{ + + @Test + public void testCbHdlrLeakOnParticipantSessionExpiry() throws Exception + { + // Logger.getRootLogger().setLevel(Level.INFO); + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + final int n = 2; + + System.out.println("START " + clusterName + " at " + + new Date(System.currentTimeMillis())); + + TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port + "localhost", // participant name prefix + "TestDB", // resource name prefix + 1, // resources + 32, // partitions per resource + n, // number of nodes + 2, // replicas + "MasterSlave", + true); // do rebalance + + // start controller + final ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller"); + controller.connect(); + + // start participants + MockParticipantManager[] participants = new MockParticipantManager[n]; + for (int i = 0; i < n; i++) + { + String instanceName = "localhost_" + (12918 + i); + + participants[i] = + new MockParticipantManager(ZK_ADDR, clusterName, instanceName, null); + participants[i].syncStart(); + } + + boolean result = + ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + + // check controller zk-watchers + result = TestHelper.verify(new TestHelper.Verifier() + { + + @Override + public boolean verify() throws Exception + { + Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR); +// System.out.println("all watchers: " + watchers); + Set<String> watchPaths = watchers.get("0x" + controller.getSessionId()); +// System.out.println("controller watch paths: " + watchPaths); + + // controller should have 5 + 2n + m + (m+2)n zk-watchers + // where n is number of nodes and m is number of resources + return watchPaths.size() == (6 + 5 * n); + } + }, 500); + Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers."); + + // check participant zk-watchers + final MockParticipantManager participantManagerToExpire = participants[0]; + result = TestHelper.verify(new TestHelper.Verifier() + { + + @Override + public boolean verify() throws Exception + { + Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR); + Set<String> watchPaths = + watchers.get("0x" + participantManagerToExpire.getSessionId()); + // System.out.println("participant watch paths: " + watchPaths); + + // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER + return watchPaths.size() == 2; + } + }, + 500); + Assert.assertTrue(result, "Participant should have 2 zk-watchers."); + + // check HelixManager#_handlers + // printHandlers(controllerManager); + // printHandlers(participantManagerToExpire); + int controllerHandlerNb = controller.getHandlers().size(); + int particHandlerNb = participantManagerToExpire.getHandlers().size(); + Assert.assertEquals(controllerHandlerNb, + (5 + 2 * n), + "HelixController should have 9 (5+2n) callback handlers for 2 (n) participant"); + Assert.assertEquals(particHandlerNb, + 2, + "HelixParticipant should have 2 (msg+cur-state) callback handlers"); + + // expire the session of participant + System.out.println("Expiring participant session..."); + String oldSessionId = participantManagerToExpire.getSessionId(); + + ZkTestHelper.expireSession(participantManagerToExpire.getZkClient()); + String newSessionId = participantManagerToExpire.getSessionId(); + System.out.println("Expried participant session. oldSessionId: " + oldSessionId + + ", newSessionId: " + newSessionId); + + result = + ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + + // check controller zk-watchers + result = TestHelper.verify(new TestHelper.Verifier() + { + + @Override + public boolean verify() throws Exception + { + Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR); + Set<String> watchPaths = watchers.get("0x" + controller.getSessionId()); + // System.out.println("controller watch paths after session expiry: " + + // watchPaths); + + // controller should have 5 + 2n + m + (m+2)n zk-watchers + // where n is number of nodes and m is number of resources + return watchPaths.size() == (6 + 5 * n); + } + }, 500); + Assert.assertTrue(result, + "Controller should have 6 + 5*n zk-watchers after session expiry."); + + // check participant zk-watchers + result = TestHelper.verify(new TestHelper.Verifier() + { + + @Override + public boolean verify() throws Exception + { + Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR); + Set<String> watchPaths = + watchers.get("0x" + participantManagerToExpire.getSessionId()); + // System.out.println("participant watch paths after session expiry: " + + // watchPaths); + + // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER + return watchPaths.size() == 2; + } + }, + 500); + Assert.assertTrue(result, + "Participant should have 2 zk-watchers after session expiry."); + + // check handlers + // printHandlers(controllerManager); + // printHandlers(participantManagerToExpire); + int handlerNb = controller.getHandlers().size(); + Assert.assertEquals(handlerNb, + controllerHandlerNb, + "controller callback handlers should not increase after participant session expiry"); + handlerNb = participantManagerToExpire.getHandlers().size(); + Assert.assertEquals(handlerNb, + particHandlerNb, + "participant callback handlers should not increase after participant session expiry"); + + System.out.println("END " + clusterName + " at " + + new Date(System.currentTimeMillis())); + } + + @Test + public void testCbHdlrLeakOnControllerSessionExpiry() throws Exception + { + // Logger.getRootLogger().setLevel(Level.INFO); + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + final int n = 2; + + System.out.println("START " + clusterName + " at " + + new Date(System.currentTimeMillis())); + + TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port + "localhost", // participant name prefix + "TestDB", // resource name prefix + 1, // resources + 32, // partitions per resource + n, // number of nodes + 2, // replicas + "MasterSlave", + true); // do rebalance + + final ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller"); + controller.syncStart(); + + // start participants + MockParticipantManager[] participants = new MockParticipantManager[n]; + for (int i = 0; i < n; i++) + { + String instanceName = "localhost_" + (12918 + i); + + participants[i] = + new MockParticipantManager(ZK_ADDR, clusterName, instanceName, null); + participants[i].syncStart(); + } + + boolean result = + ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + + // wait until we get all the listeners registered + final MockParticipantManager participantManager = participants[0]; + result = TestHelper.verify(new TestHelper.Verifier() + { + + @Override + public boolean verify() throws Exception + { + int controllerHandlerNb = controller.getHandlers().size(); + int particHandlerNb = participantManager.getHandlers().size(); + if (controllerHandlerNb == 9 && particHandlerNb == 2) + return true; + else + return false; + } + }, 1000); + + int controllerHandlerNb = controller.getHandlers().size(); + int particHandlerNb = participantManager.getHandlers().size(); + Assert.assertEquals(controllerHandlerNb, + (5 + 2 * n), + "HelixController should have 9 (5+2n) callback handlers for 2 participant, but was " + + controllerHandlerNb + ", " + printHandlers(controller)); + Assert.assertEquals(particHandlerNb, + 2, + "HelixParticipant should have 2 (msg+cur-state) callback handlers, but was " + + particHandlerNb + ", " + printHandlers(participantManager)); + + // expire controller + System.out.println("Expiring controller session..."); + String oldSessionId = controller.getSessionId(); + + ZkTestHelper.expireSession(controller.getZkClient()); + String newSessionId = controller.getSessionId(); + System.out.println("Expired controller session. oldSessionId: " + oldSessionId + + ", newSessionId: " + newSessionId); + + result = + ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + + // check controller zk-watchers + result = TestHelper.verify(new TestHelper.Verifier() + { + + @Override + public boolean verify() throws Exception + { + Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR); + Set<String> watchPaths = watchers.get("0x" + controller.getSessionId()); + // System.out.println("controller watch paths after session expiry: " + + // watchPaths); + + // controller should have 5 + 2n + m + (m+2)n zk-watchers + // where n is number of nodes and m is number of resources + return watchPaths.size() == (6 + 5 * n); + } + }, 500); + Assert.assertTrue(result, + "Controller should have 6 + 5*n zk-watchers after session expiry."); + + // check participant zk-watchers + result = TestHelper.verify(new TestHelper.Verifier() + { + + @Override + public boolean verify() throws Exception + { + Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR); + Set<String> watchPaths = watchers.get("0x" + participantManager.getSessionId()); + // System.out.println("participant watch paths after session expiry: " + + // watchPaths); + + // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER + return watchPaths.size() == 2; + } + }, 500); + Assert.assertTrue(result, + "Participant should have 2 zk-watchers after session expiry."); + + // check HelixManager#_handlers + // printHandlers(controllerManager); + int handlerNb = controller.getHandlers().size(); + Assert.assertEquals(handlerNb, + controllerHandlerNb, + "controller callback handlers should not increase after participant session expiry, but was " + + printHandlers(controller)); + handlerNb = participantManager.getHandlers().size(); + Assert.assertEquals(handlerNb, + particHandlerNb, + "participant callback handlers should not increase after participant session expiry, but was " + + printHandlers(participantManager)); + + System.out.println("END " + clusterName + " at " + + new Date(System.currentTimeMillis())); + } + + @Test + public void testRemoveUserCbHdlrOnPathRemoval() throws Exception + { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + final int n = 3; + final String zkAddr = ZK_ADDR; + System.out.println("START " + clusterName + " at " + + new Date(System.currentTimeMillis())); + + TestHelper.setupCluster(clusterName, zkAddr, 12918, "localhost", "TestDB", 1, // resource + 32, // partitions + n, // nodes + 2, // replicas + "MasterSlave", + true); + + final ClusterControllerManager controller = + new ClusterControllerManager(zkAddr, clusterName, "controller"); + controller.syncStart(); + + MockParticipantManager[] participants = new MockParticipantManager[n]; + for (int i = 0; i < n; i++) + { + String instanceName = "localhost_" + (12918 + i); + participants[i] = + new MockParticipantManager(zkAddr, clusterName, instanceName, null); + participants[i].syncStart(); + + // register a controller listener on participant_0 + if (i == 0) + { + MockParticipantManager manager = participants[0]; + manager.addCurrentStateChangeListener(new CurrentStateChangeListener() + { + @Override + public void onStateChange(String instanceName, + List<CurrentState> statesInfo, + NotificationContext changeContext) + { + // To change body of implemented methods use File | Settings | File Templates. + // System.out.println(instanceName + " on current-state change, type: " + + // changeContext.getType()); + } + }, manager.getInstanceName(), manager.getSessionId()); + } + } + + Boolean result = + ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr, + clusterName)); + Assert.assertTrue(result); + + MockParticipantManager participantToExpire = participants[0]; + String oldSessionId = participantToExpire.getSessionId(); + PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName); + + // check manager#hanlders + Assert.assertEquals(participantToExpire.getHandlers().size(), + 3, + "Should have 3 handlers: CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES"); + + // check zkclient#listeners + Map<String, Set<IZkDataListener>> dataListeners = + ZkTestHelper.getZkDataListener(participantToExpire.getZkClient()); + Map<String, Set<IZkChildListener>> childListeners = + ZkTestHelper.getZkChildListener(participantToExpire.getZkClient()); + // printZkListeners(participantToExpire.getZkClient()); + Assert.assertEquals(dataListeners.size(), + 1, + "Should have 1 path (CURRENTSTATE/{sessionId}/TestDB0) which has 1 data-listeners"); + String path = + keyBuilder.currentState(participantToExpire.getInstanceName(), + oldSessionId, + "TestDB0").getPath(); + Assert.assertEquals(dataListeners.get(path).size(), + 1, + "Should have 1 data-listeners on path: " + path); + Assert.assertEquals(childListeners.size(), + 3, + "Should have 3 paths (CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES) each of which has 1 child-listener"); + path = + keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId) + .getPath(); + Assert.assertEquals(childListeners.get(path).size(), + 1, + "Should have 1 child-listener on path: " + path); + path = keyBuilder.messages(participantToExpire.getInstanceName()).getPath(); + Assert.assertEquals(childListeners.get(path).size(), + 1, + "Should have 1 child-listener on path: " + path); + path = keyBuilder.controller().getPath(); + Assert.assertEquals(childListeners.get(path).size(), + 1, + "Should have 1 child-listener on path: " + path); + + // check zookeeper#watches on client side + Map<String, List<String>> watchPaths = + ZkTestHelper.getZkWatch(participantToExpire.getZkClient()); + // System.out.println("localhost_12918 zk-client side watchPaths: " + watchPaths + + // "\n"); + Assert.assertEquals(watchPaths.get("dataWatches").size(), + 4, + "Should have 4 data-watches: CURRENTSTATE/{sessionId}, CURRENTSTATE/{sessionId}/TestDB, CONTROLLER, MESSAGES"); + Assert.assertEquals(watchPaths.get("childWatches").size(), + 3, + "Should have 3 child-watches: CONTROLLER, MESSAGES, and CURRENTSTATE/{sessionId}"); + + // expire localhost_12918 + System.out.println("Expire participant: " + participantToExpire.getInstanceName() + + ", session: " + participantToExpire.getSessionId()); + ZkTestHelper.expireSession(participantToExpire.getZkClient()); + String newSessionId = participantToExpire.getSessionId(); + System.out.println(participantToExpire.getInstanceName() + " oldSessionId: " + + oldSessionId + ", newSessionId: " + newSessionId); + result = + ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr, + clusterName)); + Assert.assertTrue(result); + + // check manager#hanlders + Assert.assertEquals(participantToExpire.getHandlers().size(), + 2, + "Should have 2 handlers: CONTROLLER and MESSAGES. CURRENTSTATE/{sessionId} handler should be removed by CallbackHandler#handleChildChange()"); + + // check zkclient#listeners + dataListeners = ZkTestHelper.getZkDataListener(participantToExpire.getZkClient()); + childListeners = ZkTestHelper.getZkChildListener(participantToExpire.getZkClient()); + // printZkListeners(participantToExpire.getZkClient()); + Assert.assertTrue(dataListeners.isEmpty(), "Should have no data-listeners"); + Assert.assertEquals(childListeners.size(), + 3, + "Should have 3 paths (CURRENTSTATE/{oldSessionId}, CONTROLLER, and MESSAGES). " + + "CONTROLLER and MESSAGE has 1 child-listener each. CURRENTSTATE/{oldSessionId} doesn't have listener (ZkClient doesn't remove empty childListener set. probably a ZkClient bug. see ZkClient#unsubscribeChildChange())"); + path = + keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId) + .getPath(); + Assert.assertEquals(childListeners.get(path).size(), + 0, + "Should have no child-listener on path: " + path); + path = keyBuilder.messages(participantToExpire.getInstanceName()).getPath(); + Assert.assertEquals(childListeners.get(path).size(), + 1, + "Should have 1 child-listener on path: " + path); + path = keyBuilder.controller().getPath(); + Assert.assertEquals(childListeners.get(path).size(), + 1, + "Should have 1 child-listener on path: " + path); + + // check zookeeper#watches on client side + watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient()); + // System.out.println("localhost_12918 zk-client side watchPaths: " + watchPaths + + // "\n"); + Assert.assertEquals(watchPaths.get("dataWatches").size(), + 2, + "Should have 2 data-watches: CONTROLLER and MESSAGES"); + Assert.assertEquals(watchPaths.get("childWatches").size(), + 2, + "Should have 2 child-watches: CONTROLLER and MESSAGES"); + Assert.assertEquals(watchPaths.get("existWatches").size(), + 2, + "Should have 2 exist-watches: CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0"); + + // another session expiry on localhost_12918 should clear the two exist-watches on + // CURRENTSTATE/{oldSessionId} + System.out.println("Expire participant: " + participantToExpire.getInstanceName() + + ", session: " + participantToExpire.getSessionId()); + ZkTestHelper.expireSession(participantToExpire.getZkClient()); + result = + ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr, + clusterName)); + Assert.assertTrue(result); + + // check zookeeper#watches on client side + watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient()); + // System.out.println("localhost_12918 zk-client side watchPaths: " + watchPaths + + // "\n"); + Assert.assertEquals(watchPaths.get("dataWatches").size(), + 2, + "Should have 2 data-watches: CONTROLLER and MESSAGES"); + Assert.assertEquals(watchPaths.get("childWatches").size(), + 2, + "Should have 2 child-watches: CONTROLLER and MESSAGES"); + Assert.assertEquals(watchPaths.get("existWatches").size(), + 0, + "Should have no exist-watches. exist-watches on CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0 should be cleared during handleNewSession"); + + // Thread.sleep(1000); + System.out.println("END " + clusterName + " at " + + new Date(System.currentTimeMillis())); + } + + // debug code + static String printHandlers(ZkTestManager manager) + { + StringBuilder sb = new StringBuilder(); + List<CallbackHandler> handlers = manager.getHandlers(); + sb.append(manager.getInstanceName() + " has " + handlers.size() + " cb-handlers. ["); + + for (int i = 0; i < handlers.size(); i++) + { + CallbackHandler handler = handlers.get(i); + String path = handler.getPath(); + sb.append(path.substring(manager.getClusterName().length() + 1) + ": " + + handler.getListener()); + if (i < (handlers.size() - 1)) + { + sb.append(", "); + } + } + sb.append("]"); + + return sb.toString(); + } + + void printZkListeners(ZkClient client) throws Exception{ + Map<String, Set<IZkDataListener>> datalisteners = ZkTestHelper.getZkDataListener(client); + Map<String, Set<IZkChildListener>> childListeners = ZkTestHelper.getZkChildListener(client); + + System.out.println("dataListeners {"); + for (String path : datalisteners.keySet()) { + System.out.println("\t" + path + ": "); + Set<IZkDataListener> set = datalisteners.get(path); + for (IZkDataListener listener : set) { + CallbackHandler handler = (CallbackHandler)listener; + System.out.println("\t\t" + handler.getListener()); + } + } + System.out.println("}"); + + System.out.println("childListeners {"); + for (String path : childListeners.keySet()) { + System.out.println("\t" + path + ": "); + Set<IZkChildListener> set = childListeners.get(path); + for (IZkChildListener listener : set) { + CallbackHandler handler = (CallbackHandler)listener; + System.out.println("\t\t" + handler.getListener()); + } + } + System.out.println("}"); +} +} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d392410/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java new file mode 100644 index 0000000..883657e --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java @@ -0,0 +1,36 @@ +package org.apache.helix.integration.manager; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.List; + +import org.apache.helix.manager.zk.CallbackHandler; +import org.apache.helix.manager.zk.ZkClient; + +public interface ZkTestManager +{ + ZkClient getZkClient(); + + List<CallbackHandler> getHandlers(); + + String getInstanceName(); + + String getClusterName(); +}
