[HELIX-268] Atomic API, rb=14578
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/e39b924b Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/e39b924b Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/e39b924b Branch: refs/heads/master Commit: e39b924b5d3975b90e530c17be7bdf2239affe86 Parents: 61643b1 Author: Kanak Biscuitwala <[email protected]> Authored: Thu Oct 10 17:59:31 2013 -0700 Committer: Kanak Biscuitwala <[email protected]> Committed: Wed Nov 6 13:17:37 2013 -0800 ---------------------------------------------------------------------- .../helix/api/accessor/AtomicClusterAccessor.java | 6 +++++- .../api/accessor/AtomicParticipantAccessor.java | 6 +++++- .../helix/api/accessor/AtomicResourceAccessor.java | 6 +++++- .../java/org/apache/helix/lock/zk/ZKHelixLock.java | 8 +++++--- .../apache/helix/model/ClusterConfiguration.java | 16 ---------------- .../helix/api/accessor/TestAccessorRecreate.java | 10 +++++++++- .../helix/api/accessor/TestAtomicAccessors.java | 12 +++++------- .../controller/stages/TestMessageThrottleStage.java | 8 ++++++++ .../controller/stages/TestRebalancePipeline.java | 14 ++++++++++++++ .../org/apache/helix/lock/zk/TestZKHelixLock.java | 14 ++++++++++++-- 10 files changed, 68 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java index 1196770..37cc47e 100644 --- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java @@ -46,7 +46,11 @@ import com.google.common.collect.Sets; * An atomic version of the ClusterAccessor. If atomic operations are required, use instances of * this class. Atomicity is not guaranteed when using instances of ClusterAccessor alongside * instances of this class. Furthermore, depending on the semantics of the lock, lock acquisition - * may fail, in which case users should handle the return value of each function if necessary. + * may fail, in which case users should handle the return value of each function if necessary. <br/> + * <br/> + * Using this class is quite expensive; it should thus be used sparingly and only in systems where + * contention on these operations is expected. For most systems running Helix, this is typically not + * the case. */ public class AtomicClusterAccessor extends ClusterAccessor { private static final Logger LOG = Logger.getLogger(AtomicClusterAccessor.class); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java index 05fb0ec..1c734e3 100644 --- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java @@ -38,7 +38,11 @@ import org.apache.log4j.Logger; * An atomic version of the ParticipantAccessor. If atomic operations are required, use instances of * this class. Atomicity is not guaranteed when using instances of ParticipantAccessor alongside * instances of this class. Furthermore, depending on the semantics of the lock, lock acquisition - * may fail, in which case users should handle the return value of each function if necessary. + * may fail, in which case users should handle the return value of each function if necessary. <br/> + * <br/> + * Using this class is quite expensive; it should thus be used sparingly and only in systems where + * contention on these operations is expected. For most systems running Helix, this is typically not + * the case. */ public class AtomicParticipantAccessor extends ParticipantAccessor { private static final Logger LOG = Logger.getLogger(AtomicParticipantAccessor.class); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java index 4bb2ebe..6d69981 100644 --- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java @@ -34,7 +34,11 @@ import org.apache.log4j.Logger; * An atomic version of the ResourceAccessor. If atomic operations are required, use instances of * this class. Atomicity is not guaranteed when using instances of ResourceAccessor alongside * instances of this class. Furthermore, depending on the semantics of the lock, lock acquisition - * may fail, in which case users should handle the return value of each function if necessary. + * may fail, in which case users should handle the return value of each function if necessary. <br/> + * <br/> + * Using this class is quite expensive; it should thus be used sparingly and only in systems where + * contention on these operations is expected. For most systems running Helix, this is typically not + * the case. */ public class AtomicResourceAccessor extends ResourceAccessor { private static final Logger LOG = Logger.getLogger(AtomicResourceAccessor.class); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java index 4d3b459..1d56f13 100644 --- a/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java +++ b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java @@ -92,9 +92,9 @@ public class ZKHelixLock implements HelixLock { @Override public synchronized boolean lock() { _canceled = false; - if (_locked) { - // no need to proceed if the lock is already acquired - return true; + if (_locked || isBlocked()) { + // no need to proceed if the lock is already acquired or already waiting + return false; } try { // create the root path if it doesn't exist @@ -132,6 +132,8 @@ public class ZKHelixLock implements HelixLock { if (LOG.isInfoEnabled()) { LOG.info("Unlock skipped because lock node was not present"); } + } catch (RuntimeException e) { + LOG.error("Error connecting to release the lock"); } _locked = false; return true; http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java index 1278ceb..6887f24 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java @@ -66,22 +66,6 @@ public class ClusterConfiguration extends HelixProperty { } /** - * Set the identifier of this configuration for the last write - * @param writeId positive random long identifier - */ - public void setWriteId(long writeId) { - _record.setLongField(Fields.WRITE_ID.toString(), writeId); - } - - /** - * Get the identifier for the last write to this configuration - * @return positive write identifier, or -1 of not set - */ - public long getWriteId() { - return _record.getLongField(Fields.WRITE_ID.toString(), -1); - } - - /** * Create a new ClusterConfiguration from a UserConfig * @param userConfig user-defined configuration properties * @return ClusterConfiguration http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java index a92f12a..4eebbc6 100644 --- a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java +++ b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java @@ -50,7 +50,7 @@ public class TestAccessorRecreate extends ZkUnitTestBase { @Test public void testRecreateCluster() { final String MODIFIER = "modifier"; - final ClusterId clusterId = ClusterId.from("testCluster"); + final ClusterId clusterId = ClusterId.from("TestAccessorRecreate!testCluster"); // connect boolean connected = _gZkClient.waitUntilConnected(30000, TimeUnit.MILLISECONDS); @@ -159,4 +159,12 @@ public class TestAccessorRecreate extends ZkUnitTestBase { .userConfig(userConfig).build(); return accessor.addParticipantToCluster(participant); } + // private HelixLockable lockProvider() { + // return new HelixLockable() { + // @Override + // public HelixLock getLock(ClusterId clusterId, Scope<?> scope) { + // return new ZKHelixLock(clusterId, scope, _gZkClient); + // } + // }; + // } } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java index 4087796..443c3db 100644 --- a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java +++ b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java @@ -49,7 +49,7 @@ public class TestAtomicAccessors extends ZkUnitTestBase { @Test public void testClusterUpdates() { - final ClusterId clusterId = ClusterId.from("testCluster"); + final ClusterId clusterId = ClusterId.from("TestAtomicAccessors!testCluster"); final BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); final HelixDataAccessor helixAccessor = new ZKHelixDataAccessor(clusterId.stringify(), baseAccessor); @@ -160,15 +160,13 @@ public class TestAtomicAccessors extends ZkUnitTestBase { } @Override - public boolean lock() { + public synchronized boolean lock() { // synchronize here to ensure atomic set and so that the first lock is the first one who // gets to lock - synchronized (LockProvider.this) { - if (_firstLock == null) { - _firstLock = this; - } - return super.lock(); + if (_firstLock == null) { + _firstLock = this; } + return super.lock(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java index 450d654..0bd8795 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java @@ -31,6 +31,8 @@ import org.apache.helix.HelixManager; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.ZNRecord; import org.apache.helix.ZkUnitTestBase; +import org.apache.helix.api.accessor.ClusterAccessor; +import org.apache.helix.api.id.ClusterId; import org.apache.helix.api.id.MessageId; import org.apache.helix.api.id.PartitionId; import org.apache.helix.api.id.ResourceId; @@ -70,6 +72,9 @@ public class TestMessageThrottleStage extends ZkUnitTestBase { }); setupStateModel(clusterName); + ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor); + clusterAccessor.initClusterStructure(); + ClusterEvent event = new ClusterEvent("testEvent"); event.addAttribute("helixmanager", manager); @@ -143,6 +148,9 @@ public class TestMessageThrottleStage extends ZkUnitTestBase { }); setupStateModel(clusterName); + ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor); + clusterAccessor.initClusterStructure(); + // setup constraints ZNRecord record = new ZNRecord(ConstraintType.MESSAGE_CONSTRAINT.toString()); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java index 5a7c6ac..c9e0f53 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java @@ -30,6 +30,8 @@ import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; import org.apache.helix.ZkUnitTestBase; import org.apache.helix.api.State; +import org.apache.helix.api.accessor.ClusterAccessor; +import org.apache.helix.api.id.ClusterId; import org.apache.helix.api.id.PartitionId; import org.apache.helix.api.id.ResourceId; import org.apache.helix.api.id.SessionId; @@ -77,6 +79,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase { }); setupStateModel(clusterName); + ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor); + clusterAccessor.initClusterStructure(); + // cluster data cache refresh pipeline Pipeline dataRefresh = new Pipeline(); dataRefresh.addStage(new NewReadClusterDataStage()); @@ -153,6 +158,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase { 0, 1 }); + ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor); + clusterAccessor.initClusterStructure(); + TestHelper .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE); @@ -226,6 +234,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase { }); setupStateModel(clusterName); + ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor); + clusterAccessor.initClusterStructure(); + // cluster data cache refresh pipeline Pipeline dataRefresh = new Pipeline(); dataRefresh.addStage(new NewReadClusterDataStage()); @@ -328,6 +339,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase { }); setupStateModel(clusterName); + ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor); + clusterAccessor.initClusterStructure(); + // cluster data cache refresh pipeline Pipeline dataRefresh = new Pipeline(); dataRefresh.addStage(new NewReadClusterDataStage()); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java b/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java index 4e023ba..f39ca11 100644 --- a/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java +++ b/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java @@ -35,8 +35,10 @@ import org.testng.annotations.Test; */ public class TestZKHelixLock extends ZkUnitTestBase { @Test - public void basicTest() { - _gZkClient.waitUntilConnected(30000, TimeUnit.MILLISECONDS); + public void basicTest() throws InterruptedException { + final long TIMEOUT = 30000; + final long RETRY_INTERVAL = 100; + _gZkClient.waitUntilConnected(TIMEOUT, TimeUnit.MILLISECONDS); final AtomicBoolean t1Locked = new AtomicBoolean(false); final AtomicBoolean t1Done = new AtomicBoolean(false); final AtomicInteger field1 = new AtomicInteger(0); @@ -103,6 +105,14 @@ public class TestZKHelixLock extends ZkUnitTestBase { Assert.assertEquals(field2.get(), 1); // unlock t1's lock after checking that t2 is blocked + long count = 0; + while (!lock2.isBlocked()) { + if (count > TIMEOUT) { + break; + } + Thread.sleep(RETRY_INTERVAL); + count += RETRY_INTERVAL; + } Assert.assertTrue(lock2.isBlocked()); lock1.unlock();
