Updated Branches: refs/heads/helix-logical-model 101fe1e9a -> bdb7a4d3f
[HELIX-238] Assorted fixes Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/bdb7a4d3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/bdb7a4d3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/bdb7a4d3 Branch: refs/heads/helix-logical-model Commit: bdb7a4d3fc4c29685ba47f56a6c44e1ae1dc5577 Parents: 101fe1e Author: Kanak Biscuitwala <[email protected]> Authored: Thu Oct 10 10:35:41 2013 -0700 Committer: Kanak Biscuitwala <[email protected]> Committed: Thu Oct 10 10:35:41 2013 -0700 ---------------------------------------------------------------------- .../api/accessor/AtomicClusterAccessor.java | 2 +- .../api/accessor/AtomicParticipantAccessor.java | 7 +-- .../api/accessor/AtomicResourceAccessor.java | 2 +- .../helix/api/accessor/ClusterAccessor.java | 23 ++++++--- .../helix/api/accessor/ParticipantAccessor.java | 42 +++++++++------- .../helix/api/accessor/ResourceAccessor.java | 28 +++++++---- .../org/apache/helix/lock/zk/ZKHelixLock.java | 53 +++----------------- .../helix/model/ClusterConfiguration.java | 20 ++++++++ .../helix/api/accessor/TestAtomicAccessors.java | 12 +++-- 9 files changed, 95 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bdb7a4d3/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java index d17b2af..1196770 100644 --- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java @@ -57,7 +57,7 @@ public class AtomicClusterAccessor extends ClusterAccessor { private final ClusterId _clusterId; /** - * Non-atomic instance to protect against recursive locking via polymorphism + * Non-atomic instance to protect against reentrant locking via polymorphism */ private final ClusterAccessor _clusterAccessor; http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bdb7a4d3/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java index fd05b48..05fb0ec 100644 --- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java @@ -48,7 +48,7 @@ public class AtomicParticipantAccessor extends ParticipantAccessor { private final HelixLockable _lockProvider; /** - * Non-atomic instance to protect against recursive locking via polymorphism + * Non-atomic instance to protect against reentrant locking via polymorphism */ private final ParticipantAccessor _participantAccessor; @@ -68,16 +68,17 @@ public class AtomicParticipantAccessor extends ParticipantAccessor { } @Override - void enableParticipant(ParticipantId participantId, boolean isEnabled) { + boolean enableParticipant(ParticipantId participantId, boolean isEnabled) { HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId)); boolean locked = lock.lock(); if (locked) { try { - _participantAccessor.enableParticipant(participantId); + return _participantAccessor.enableParticipant(participantId); } finally { lock.unlock(); } } + return false; } @Override http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bdb7a4d3/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java index 95c0b05..4bb2ebe 100644 --- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java @@ -44,7 +44,7 @@ public class AtomicResourceAccessor extends ResourceAccessor { private final HelixLockable _lockProvider; /** - * Non-atomic instance to protect against recursive locking via polymorphism + * Non-atomic instance to protect against reentrant locking via polymorphism */ private final ResourceAccessor _resourceAccessor; http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bdb7a4d3/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java index ea7dbb9..abeb649 100644 --- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java @@ -161,6 +161,7 @@ public class ClusterAccessor { return false; } ClusterConfiguration configuration = ClusterConfiguration.from(config.getUserConfig()); + configuration.setAutoJoinAllowed(config.autoJoinAllowed()); _accessor.setProperty(_keyBuilder.clusterConfig(), configuration); Map<ConstraintType, ClusterConstraints> constraints = config.getConstraintMap(); for (ConstraintType type : constraints.keySet()) { @@ -465,7 +466,8 @@ public class ClusterAccessor { */ public boolean addStat(final String statName) { if (!isClusterStructureValid()) { - throw new HelixException("cluster " + _clusterId + " is not setup yet"); + LOG.error("cluster " + _clusterId + " is not setup yet"); + return false; } String persistentStatsPath = _keyBuilder.persistantStat().getPath(); @@ -496,7 +498,8 @@ public class ClusterAccessor { */ public boolean dropStat(final String statName) { if (!isClusterStructureValid()) { - throw new HelixException("cluster " + _clusterId + " is not setup yet"); + LOG.error("cluster " + _clusterId + " is not setup yet"); + return false; } String persistentStatsPath = _keyBuilder.persistantStat().getPath(); @@ -528,7 +531,8 @@ public class ClusterAccessor { */ public boolean addAlert(final String alertName) { if (!isClusterStructureValid()) { - throw new HelixException("cluster " + _clusterId + " is not setup yet"); + LOG.error("cluster " + _clusterId + " is not setup yet"); + return false; } BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor(); @@ -564,7 +568,8 @@ public class ClusterAccessor { */ public boolean dropAlert(final String alertName) { if (!isClusterStructureValid()) { - throw new HelixException("cluster " + _clusterId + " is not setup yet"); + LOG.error("cluster " + _clusterId + " is not setup yet"); + return false; } String alertsPath = _keyBuilder.alerts().getPath(); @@ -597,16 +602,18 @@ public class ClusterAccessor { /** * pause controller of cluster + * @return true if cluster was paused, false if pause failed or already paused */ - public void pauseCluster() { - _accessor.createProperty(_keyBuilder.pause(), new PauseSignal("pause")); + public boolean pauseCluster() { + return _accessor.createProperty(_keyBuilder.pause(), new PauseSignal("pause")); } /** * resume controller of cluster + * @return true if resume succeeded, false otherwise */ - public void resumeCluster() { - _accessor.removeProperty(_keyBuilder.pause()); + public boolean resumeCluster() { + return _accessor.removeProperty(_keyBuilder.pause()); } /** http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bdb7a4d3/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java index 4e7d3c2..ac8f79d 100644 --- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java @@ -86,34 +86,36 @@ public class ParticipantAccessor { * enable/disable a participant * @param participantId * @param isEnabled + * @return true if enable state succeeded, false otherwise */ - void enableParticipant(ParticipantId participantId, boolean isEnabled) { + boolean enableParticipant(ParticipantId participantId, boolean isEnabled) { String participantName = participantId.stringify(); if (_accessor.getProperty(_keyBuilder.instanceConfig(participantName)) == null) { LOG.error("Config for participant: " + participantId + " does NOT exist in cluster"); - return; + return false; } InstanceConfig config = new InstanceConfig(participantName); config.setInstanceEnabled(isEnabled); - _accessor.updateProperty(_keyBuilder.instanceConfig(participantName), config); - + return _accessor.updateProperty(_keyBuilder.instanceConfig(participantName), config); } /** * disable participant * @param participantId + * @return true if disabled successfully, false otherwise */ - public void disableParticipant(ParticipantId participantId) { - enableParticipant(participantId, false); + public boolean disableParticipant(ParticipantId participantId) { + return enableParticipant(participantId, false); } /** * enable participant * @param participantId + * @return true if enabled successfully, false otherwise */ - public void enableParticipant(ParticipantId participantId) { - enableParticipant(participantId, true); + public boolean enableParticipant(ParticipantId participantId) { + return enableParticipant(participantId, true); } /** @@ -173,8 +175,9 @@ public class ParticipantAccessor { * @param participantId * @param resourceId * @param partitionIdSet + * @return true if enable state changed successfully, false otherwise */ - void enablePartitionsForParticipant(final boolean enabled, final ParticipantId participantId, + boolean enablePartitionsForParticipant(final boolean enabled, final ParticipantId participantId, final ResourceId resourceId, final Set<PartitionId> partitionIdSet) { String participantName = participantId.stringify(); String resourceName = resourceId.stringify(); @@ -183,7 +186,7 @@ public class ParticipantAccessor { PropertyKey instanceConfigKey = _keyBuilder.instanceConfig(participantName); if (_accessor.getProperty(instanceConfigKey) == null) { LOG.error("Config for participant: " + participantId + " does NOT exist in cluster"); - return; + return false; } // check resource exist. warn if not @@ -211,7 +214,7 @@ public class ParticipantAccessor { partitionNames.add(partitionId.stringify()); } - baseAccessor.update(instanceConfigKey.getPath(), new DataUpdater<ZNRecord>() { + return baseAccessor.update(instanceConfigKey.getPath(), new DataUpdater<ZNRecord>() { @Override public ZNRecord update(ZNRecord currentData) { if (currentData == null) { @@ -245,10 +248,11 @@ public class ParticipantAccessor { * @param participantId * @param resourceId * @param disablePartitionIdSet + * @return true if disabled successfully, false otherwise */ - public void disablePartitionsForParticipant(ParticipantId participantId, ResourceId resourceId, - Set<PartitionId> disablePartitionIdSet) { - enablePartitionsForParticipant(false, participantId, resourceId, disablePartitionIdSet); + public boolean disablePartitionsForParticipant(ParticipantId participantId, + ResourceId resourceId, Set<PartitionId> disablePartitionIdSet) { + return enablePartitionsForParticipant(false, participantId, resourceId, disablePartitionIdSet); } /** @@ -256,10 +260,11 @@ public class ParticipantAccessor { * @param participantId * @param resourceId * @param enablePartitionIdSet + * @return true if enabled successfully, false otherwise */ - public void enablePartitionsForParticipant(ParticipantId participantId, ResourceId resourceId, + public boolean enablePartitionsForParticipant(ParticipantId participantId, ResourceId resourceId, Set<PartitionId> enablePartitionIdSet) { - enablePartitionsForParticipant(true, participantId, resourceId, enablePartitionIdSet); + return enablePartitionsForParticipant(true, participantId, resourceId, enablePartitionIdSet); } /** @@ -600,10 +605,11 @@ public class ParticipantAccessor { * @param resourceId resource id * @param participantId participant id * @param sessionId session id + * @return true if dropped, false otherwise */ - public void dropCurrentState(ResourceId resourceId, ParticipantId participantId, + public boolean dropCurrentState(ResourceId resourceId, ParticipantId participantId, SessionId sessionId) { - _accessor.removeProperty(_keyBuilder.currentState(participantId.stringify(), + return _accessor.removeProperty(_keyBuilder.currentState(participantId.stringify(), sessionId.stringify(), resourceId.stringify())); } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bdb7a4d3/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java index 7041c5e..58b226d 100644 --- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java @@ -108,9 +108,10 @@ public class ResourceAccessor { * save resource assignment * @param resourceId * @param resourceAssignment + * @return true if set, false otherwise */ - public void setResourceAssignment(ResourceId resourceId, ResourceAssignment resourceAssignment) { - _accessor.setProperty(_keyBuilder.resourceAssignment(resourceId.stringify()), + public boolean setResourceAssignment(ResourceId resourceId, ResourceAssignment resourceAssignment) { + return _accessor.setProperty(_keyBuilder.resourceAssignment(resourceId.stringify()), resourceAssignment); } @@ -128,9 +129,11 @@ public class ResourceAccessor { * rebalancer configuration * @param resourceId * @param configuration + * @return true if set, false otherwise */ - private void setConfiguration(ResourceId resourceId, ResourceConfiguration configuration) { - _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration); + private boolean setConfiguration(ResourceId resourceId, ResourceConfiguration configuration) { + boolean status = + _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration); // also set an ideal state if the resource supports it RebalancerConfig rebalancerConfig = new RebalancerConfig(configuration); IdealState idealState = @@ -139,6 +142,7 @@ public class ResourceAccessor { if (idealState != null) { _accessor.setProperty(_keyBuilder.idealState(resourceId.stringify()), idealState); } + return status; } /** @@ -249,27 +253,29 @@ public class ResourceAccessor { * Get a resource configuration, which may include user-defined configuration, as well as * rebalancer configuration * @param resourceId - * @return configuration + * @return configuration or null */ - public void getConfiguration(ResourceId resourceId) { - _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify())); + public ResourceConfiguration getConfiguration(ResourceId resourceId) { + return _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify())); } /** * set external view of a resource * @param resourceId * @param extView + * @return true if set, false otherwise */ - public void setExternalView(ResourceId resourceId, ExternalView extView) { - _accessor.setProperty(_keyBuilder.externalView(resourceId.stringify()), extView); + public boolean setExternalView(ResourceId resourceId, ExternalView extView) { + return _accessor.setProperty(_keyBuilder.externalView(resourceId.stringify()), extView); } /** * drop external view of a resource * @param resourceId + * @return true if dropped, false otherwise */ - public void dropExternalView(ResourceId resourceId) { - _accessor.removeProperty(_keyBuilder.externalView(resourceId.stringify())); + public boolean dropExternalView(ResourceId resourceId) { + return _accessor.removeProperty(_keyBuilder.externalView(resourceId.stringify())); } /** http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bdb7a4d3/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java index 70614b3..4d3b459 100644 --- a/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java +++ b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java @@ -35,6 +35,7 @@ import org.apache.zookeeper.ZooKeeper; /** * Locking scheme for Helix that uses the ZooKeeper exclusive lock implementation * Please use the following lock order convention: Cluster, Participant, Resource, Partition + * WARNING: this is not a reentrant lock */ public class ZKHelixLock implements HelixLock { private static final Logger LOG = Logger.getLogger(ZKHelixLock.class); @@ -96,8 +97,11 @@ public class ZKHelixLock implements HelixLock { return true; } try { + // create the root path if it doesn't exist BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient); baseAccessor.create(_rootPath, null, AccessOption.PERSISTENT); + + // try to acquire the lock boolean acquired = _writeLock.lock(); if (acquired) { _locked = true; @@ -134,7 +138,7 @@ public class ZKHelixLock implements HelixLock { } @Override - public synchronized boolean isBlocked() { + public boolean isBlocked() { return _blocked; } @@ -142,52 +146,7 @@ public class ZKHelixLock implements HelixLock { * Set if this the lock method is currently blocked * @param isBlocked true if blocked, false otherwise */ - protected synchronized void setBlocked(boolean isBlocked) { + protected void setBlocked(boolean isBlocked) { _blocked = isBlocked; } - - public static void main(String[] args) { - ZkClient zkClient = new ZkClient("localhost:2199"); - ClusterId clusterId = ClusterId.from("exampleCluster"); - final ZKHelixLock lock1 = new ZKHelixLock(clusterId, Scope.cluster(clusterId), zkClient); - final ZKHelixLock lock2 = new ZKHelixLock(clusterId, Scope.cluster(clusterId), zkClient); - System.err.println("lock1 started"); - boolean result = lock1.lock(); - System.err.println("lock1 finished " + result); - new Thread() { - @Override - public void run() { - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - System.err.println("unlock1 started"); - lock1.unlock(); - System.err.println("unlock1 finished"); - } - }.start(); - final Thread t1 = new Thread() { - @Override - public void run() { - System.err.println("lock2 started"); - boolean locked = lock2.lock(); - System.err.println("lock2 finished " + locked); - } - }; - t1.start(); - new Thread() { - @Override - public void run() { - try { - Thread.sleep(5000); - System.err.println("interrupt2 start"); - t1.interrupt(); - System.err.println("interrupt2 finished"); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - }.start(); - } } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bdb7a4d3/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java index d733a5c..1278ceb 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java @@ -29,6 +29,10 @@ import org.apache.helix.api.id.ClusterId; * Persisted configuration properties for a cluster */ public class ClusterConfiguration extends HelixProperty { + private enum Fields { + WRITE_ID + } + /** * Instantiate for an id * @param id cluster id @@ -62,6 +66,22 @@ public class ClusterConfiguration extends HelixProperty { } /** + * Set the identifier of this configuration for the last write + * @param writeId positive random long identifier + */ + public void setWriteId(long writeId) { + _record.setLongField(Fields.WRITE_ID.toString(), writeId); + } + + /** + * Get the identifier for the last write to this configuration + * @return positive write identifier, or -1 of not set + */ + public long getWriteId() { + return _record.getLongField(Fields.WRITE_ID.toString(), -1); + } + + /** * Create a new ClusterConfiguration from a UserConfig * @param userConfig user-defined configuration properties * @return ClusterConfiguration http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bdb7a4d3/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java index 8dbb43f..4087796 100644 --- a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java +++ b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java @@ -160,13 +160,15 @@ public class TestAtomicAccessors extends ZkUnitTestBase { } @Override - public synchronized boolean lock() { + public boolean lock() { // synchronize here to ensure atomic set and so that the first lock is the first one who // gets to lock - if (_firstLock == null) { - _firstLock = this; + synchronized (LockProvider.this) { + if (_firstLock == null) { + _firstLock = this; + } + return super.lock(); } - return super.lock(); } @Override @@ -186,7 +188,7 @@ public class TestAtomicAccessors extends ZkUnitTestBase { } @Override - protected synchronized void setBlocked(boolean isBlocked) { + protected void setBlocked(boolean isBlocked) { if (isBlocked) { synchronized (_hasSecondBlocked) { _hasSecondBlocked.set(true);
