Updated Branches: refs/heads/helix-logical-model fd78c678f -> 558b42c61
[HELIX-268] Increase understandability of atomic API code Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/558b42c6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/558b42c6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/558b42c6 Branch: refs/heads/helix-logical-model Commit: 558b42c61087861e8ce93f862d288bba43e1b228 Parents: fd78c67 Author: Kanak Biscuitwala <[email protected]> Authored: Wed Oct 9 10:46:09 2013 -0700 Committer: Kanak Biscuitwala <[email protected]> Committed: Wed Oct 9 10:46:09 2013 -0700 ---------------------------------------------------------------------- .../api/accessor/AtomicClusterAccessor.java | 42 ++++--- .../api/accessor/AtomicParticipantAccessor.java | 118 ++++++++----------- .../api/accessor/AtomicResourceAccessor.java | 37 ++++-- .../helix/api/accessor/ClusterAccessor.java | 6 +- .../helix/api/accessor/ParticipantAccessor.java | 4 +- .../helix/api/accessor/ResourceAccessor.java | 12 +- 6 files changed, 113 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/558b42c6/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java index 9881e2d..d17b2af 100644 --- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java @@ -57,6 +57,11 @@ public class AtomicClusterAccessor extends ClusterAccessor { private final ClusterId _clusterId; /** + * Non-atomic instance to protect against recursive locking via polymorphism + */ + private final ClusterAccessor _clusterAccessor; + + /** * Instantiate the accessor * @param clusterId the cluster to access * @param accessor a HelixDataAccessor for the physical properties @@ -69,6 +74,7 @@ public class AtomicClusterAccessor extends ClusterAccessor { _accessor = accessor; _keyBuilder = accessor.keyBuilder(); _clusterId = clusterId; + _clusterAccessor = new ClusterAccessor(clusterId, accessor); } @Override @@ -77,7 +83,7 @@ public class AtomicClusterAccessor extends ClusterAccessor { boolean locked = lock.lock(); if (locked) { try { - return super.createCluster(cluster); + return _clusterAccessor.createCluster(cluster); } finally { lock.unlock(); } @@ -91,7 +97,7 @@ public class AtomicClusterAccessor extends ClusterAccessor { boolean locked = lock.lock(); if (locked) { try { - return super.dropCluster(); + return _clusterAccessor.dropCluster(); } finally { lock.unlock(); } @@ -105,7 +111,7 @@ public class AtomicClusterAccessor extends ClusterAccessor { boolean locked = lock.lock(); if (locked) { try { - return super.readCluster(); + return _clusterAccessor.readCluster(); } finally { lock.unlock(); } @@ -123,7 +129,7 @@ public class AtomicClusterAccessor extends ClusterAccessor { boolean locked = lock.lock(); if (locked) { try { - return super.addParticipantToCluster(participant); + return _clusterAccessor.addParticipantToCluster(participant); } finally { lock.unlock(); } @@ -137,7 +143,7 @@ public class AtomicClusterAccessor extends ClusterAccessor { boolean locked = lock.lock(); if (locked) { try { - return super.dropParticipantFromCluster(participantId); + return _clusterAccessor.dropParticipantFromCluster(participantId); } finally { lock.unlock(); } @@ -155,7 +161,7 @@ public class AtomicClusterAccessor extends ClusterAccessor { boolean locked = lock.lock(); if (locked) { try { - return super.addResourceToCluster(resource); + return _clusterAccessor.addResourceToCluster(resource); } finally { lock.unlock(); } @@ -169,7 +175,7 @@ public class AtomicClusterAccessor extends ClusterAccessor { boolean locked = lock.lock(); if (locked) { try { - return super.dropResourceFromCluster(resourceId); + return _clusterAccessor.dropResourceFromCluster(resourceId); } finally { lock.unlock(); } @@ -183,14 +189,7 @@ public class AtomicClusterAccessor extends ClusterAccessor { boolean locked = lock.lock(); if (locked) { try { - Cluster cluster = super.readCluster(); - if (cluster == null) { - LOG.error("Cluster does not exist, cannot be updated"); - return null; - } - ClusterConfig config = clusterDelta.mergeInto(cluster.getConfig()); - boolean status = super.setBasicClusterConfig(config); - return status ? config : null; + return _clusterAccessor.updateCluster(clusterDelta); } finally { lock.unlock(); } @@ -241,4 +240,17 @@ public class AtomicClusterAccessor extends ClusterAccessor { } return participants; } + + @Override + public void initClusterStructure() { + HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId)); + boolean locked = lock.lock(); + if (locked) { + try { + _clusterAccessor.initClusterStructure(); + } finally { + lock.unlock(); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/558b42c6/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java index 8482208..fd05b48 100644 --- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java @@ -19,23 +19,18 @@ package org.apache.helix.api.accessor; * under the License. */ -import java.util.List; import java.util.Map; import java.util.Set; import org.apache.helix.HelixDataAccessor; -import org.apache.helix.PropertyKey; import org.apache.helix.api.Participant; import org.apache.helix.api.Scope; import org.apache.helix.api.config.ParticipantConfig; import org.apache.helix.api.id.ClusterId; import org.apache.helix.api.id.MessageId; import org.apache.helix.api.id.ParticipantId; -import org.apache.helix.api.id.ResourceId; -import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext; import org.apache.helix.lock.HelixLock; import org.apache.helix.lock.HelixLockable; -import org.apache.helix.model.IdealState; import org.apache.helix.model.Message; import org.apache.log4j.Logger; @@ -50,16 +45,39 @@ public class AtomicParticipantAccessor extends ParticipantAccessor { private final ClusterId _clusterId; private final HelixDataAccessor _accessor; - private final PropertyKey.Builder _keyBuilder; private final HelixLockable _lockProvider; + /** + * Non-atomic instance to protect against recursive locking via polymorphism + */ + private final ParticipantAccessor _participantAccessor; + + /** + * Instantiate the accessor + * @param clusterId the cluster to access + * @param accessor a HelixDataAccessor for the physical properties + * @param lockProvider a lock provider + */ public AtomicParticipantAccessor(ClusterId clusterId, HelixDataAccessor accessor, HelixLockable lockProvider) { super(accessor); _clusterId = clusterId; _accessor = accessor; - _keyBuilder = accessor.keyBuilder(); _lockProvider = lockProvider; + _participantAccessor = new ParticipantAccessor(accessor); + } + + @Override + void enableParticipant(ParticipantId participantId, boolean isEnabled) { + HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId)); + boolean locked = lock.lock(); + if (locked) { + try { + _participantAccessor.enableParticipant(participantId); + } finally { + lock.unlock(); + } + } } @Override @@ -68,7 +86,7 @@ public class AtomicParticipantAccessor extends ParticipantAccessor { boolean locked = lock.lock(); if (locked) { try { - return super.readParticipant(participantId); + return _participantAccessor.readParticipant(participantId); } finally { lock.unlock(); } @@ -87,7 +105,7 @@ public class AtomicParticipantAccessor extends ParticipantAccessor { boolean locked = lock.lock(); if (locked) { try { - return super.setParticipant(participantConfig); + return _participantAccessor.setParticipant(participantConfig); } finally { lock.unlock(); } @@ -102,14 +120,7 @@ public class AtomicParticipantAccessor extends ParticipantAccessor { boolean locked = lock.lock(); if (locked) { try { - Participant participant = super.readParticipant(participantId); - if (participant == null) { - LOG.error("Participant " + participantId + " does not exist, cannot be updated"); - return null; - } - ParticipantConfig config = participantDelta.mergeInto(participant.getConfig()); - super.setParticipant(config); - return config; + return _participantAccessor.updateParticipant(participantId, participantDelta); } finally { lock.unlock(); } @@ -117,64 +128,13 @@ public class AtomicParticipantAccessor extends ParticipantAccessor { return null; } - /** - * Swap a new participant in to serve the replicas of an old (dead) one. The atomicity scope is - * participant-local and resource-local. - */ - @Override - public boolean swapParticipants(ParticipantId oldParticipantId, ParticipantId newParticipantId) { - Participant oldParticipant = readParticipant(oldParticipantId); - if (oldParticipant == null) { - LOG.error("Could not swap participants because the old participant does not exist"); - return false; - } - if (oldParticipant.isEnabled()) { - LOG.error("Could not swap participants because the old participant is still enabled"); - return false; - } - if (oldParticipant.isAlive()) { - LOG.error("Could not swap participants because the old participant is still live"); - return false; - } - Participant newParticipant = readParticipant(newParticipantId); - if (newParticipant == null) { - LOG.error("Could not swap participants because the new participant does not exist"); - return false; - } - dropParticipant(oldParticipantId); - ResourceAccessor resourceAccessor = new ResourceAccessor(_accessor); - List<String> idealStates = _accessor.getChildNames(_keyBuilder.idealStates()); - for (String resourceName : idealStates) { - HelixLock lock = - _lockProvider.getLock(_clusterId, Scope.resource(ResourceId.from(resourceName))); - boolean locked = lock.lock(); - if (locked) { - try { - // lock the resource for all ideal state reads and updates - IdealState idealState = _accessor.getProperty(_keyBuilder.idealState(resourceName)); - if (idealState != null) { - swapParticipantsInIdealState(idealState, oldParticipantId, newParticipantId); - PartitionedRebalancerContext context = PartitionedRebalancerContext.from(idealState); - resourceAccessor.setRebalancerContext(ResourceId.from(resourceName), context); - _accessor.setProperty(_keyBuilder.idealState(resourceName), idealState); - } - } finally { - lock.unlock(); - } - } else { - return false; - } - } - return true; - } - @Override boolean dropParticipant(ParticipantId participantId) { HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId)); boolean locked = lock.lock(); if (locked) { try { - return super.dropParticipant(participantId); + return _participantAccessor.dropParticipant(participantId); } finally { lock.unlock(); } @@ -189,7 +149,7 @@ public class AtomicParticipantAccessor extends ParticipantAccessor { boolean locked = lock.lock(); if (locked) { try { - super.insertMessagesToParticipant(participantId, msgMap); + _participantAccessor.insertMessagesToParticipant(participantId, msgMap); } finally { lock.unlock(); } @@ -203,7 +163,7 @@ public class AtomicParticipantAccessor extends ParticipantAccessor { boolean locked = lock.lock(); if (locked) { try { - super.updateMessageStatus(participantId, msgMap); + _participantAccessor.updateMessageStatus(participantId, msgMap); } finally { lock.unlock(); } @@ -217,7 +177,21 @@ public class AtomicParticipantAccessor extends ParticipantAccessor { boolean locked = lock.lock(); if (locked) { try { - super.deleteMessagesFromParticipant(participantId, msgIdSet); + _participantAccessor.deleteMessagesFromParticipant(participantId, msgIdSet); + } finally { + lock.unlock(); + } + } + return; + } + + @Override + public void initParticipantStructure(ParticipantId participantId) { + HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId)); + boolean locked = lock.lock(); + if (locked) { + try { + _participantAccessor.initParticipantStructure(participantId); } finally { lock.unlock(); } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/558b42c6/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java index 4ff50d7..95c0b05 100644 --- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java @@ -40,13 +40,27 @@ public class AtomicResourceAccessor extends ResourceAccessor { private static final Logger LOG = Logger.getLogger(AtomicResourceAccessor.class); private final ClusterId _clusterId; + private final HelixDataAccessor _accessor; private final HelixLockable _lockProvider; + /** + * Non-atomic instance to protect against recursive locking via polymorphism + */ + private final ResourceAccessor _resourceAccessor; + + /** + * Instantiate the accessor + * @param clusterId the cluster to access + * @param accessor a HelixDataAccessor for the physical properties + * @param lockProvider a lock provider + */ public AtomicResourceAccessor(ClusterId clusterId, HelixDataAccessor accessor, HelixLockable lockProvider) { super(accessor); _clusterId = clusterId; + _accessor = accessor; _lockProvider = lockProvider; + _resourceAccessor = new ResourceAccessor(accessor); } @Override @@ -55,7 +69,7 @@ public class AtomicResourceAccessor extends ResourceAccessor { boolean locked = lock.lock(); if (locked) { try { - return super.readResource(resourceId); + return _resourceAccessor.readResource(resourceId); } finally { lock.unlock(); } @@ -69,14 +83,7 @@ public class AtomicResourceAccessor extends ResourceAccessor { boolean locked = lock.lock(); if (locked) { try { - Resource resource = super.readResource(resourceId); - if (resource == null) { - LOG.error("Resource " + resourceId + " does not exist, cannot be updated"); - return null; - } - ResourceConfig config = resourceDelta.mergeInto(resource.getConfig()); - super.setResource(config); - return config; + return _resourceAccessor.updateResource(resourceId, resourceDelta); } finally { lock.unlock(); } @@ -90,7 +97,7 @@ public class AtomicResourceAccessor extends ResourceAccessor { boolean locked = lock.lock(); if (locked) { try { - return super.setRebalancerContext(resourceId, context); + return _resourceAccessor.setRebalancerContext(resourceId, context); } finally { lock.unlock(); } @@ -108,7 +115,7 @@ public class AtomicResourceAccessor extends ResourceAccessor { boolean locked = lock.lock(); if (locked) { try { - return super.setResource(resourceConfig); + return _resourceAccessor.setResource(resourceConfig); } finally { lock.unlock(); } @@ -123,11 +130,17 @@ public class AtomicResourceAccessor extends ResourceAccessor { boolean locked = lock.lock(); if (locked) { try { - return super.generateDefaultAssignment(resourceId, replicaCount, participantGroupTag); + return _resourceAccessor.generateDefaultAssignment(resourceId, replicaCount, + participantGroupTag); } finally { lock.unlock(); } } return false; } + + @Override + protected ParticipantAccessor participantAccessor() { + return new AtomicParticipantAccessor(_clusterId, _accessor, _lockProvider); + } } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/558b42c6/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java index ba321cf..f283f74 100644 --- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java @@ -157,7 +157,7 @@ public class ClusterAccessor { * @param config ClusterConfig * @return true if correctly set, false otherwise */ - protected boolean setBasicClusterConfig(ClusterConfig config) { + private boolean setBasicClusterConfig(ClusterConfig config) { if (config == null) { return false; } @@ -718,7 +718,7 @@ public class ClusterAccessor { /** * Remove all but the top level cluster node; intended for reconstructing the cluster */ - void clearClusterStructure() { + private void clearClusterStructure() { BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor(); List<String> paths = getRequiredPaths(_keyBuilder); baseAccessor.remove(paths, 0); @@ -729,7 +729,7 @@ public class ClusterAccessor { * @param keyBuilder a PropertyKey.Builder for the cluster * @return list of paths as strings */ - static List<String> getRequiredPaths(PropertyKey.Builder keyBuilder) { + private static List<String> getRequiredPaths(PropertyKey.Builder keyBuilder) { List<String> paths = Lists.newArrayList(); paths.add(keyBuilder.clusterConfigs().getPath()); paths.add(keyBuilder.instanceConfigs().getPath()); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/558b42c6/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java index dd6c77b..90fd986 100644 --- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java @@ -655,7 +655,7 @@ public class ParticipantAccessor { return false; } dropParticipant(oldParticipantId); - ResourceAccessor resourceAccessor = new ResourceAccessor(_accessor); + ResourceAccessor resourceAccessor = resourceAccessor(); Map<String, IdealState> idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates()); for (String resourceName : idealStateMap.keySet()) { IdealState idealState = idealStateMap.get(resourceName); @@ -717,7 +717,7 @@ public class ParticipantAccessor { /** * Clear properties for the participant */ - public void clearParticipantStructure(ParticipantId participantId) { + void clearParticipantStructure(ParticipantId participantId) { List<String> paths = getRequiredPaths(_keyBuilder, participantId); BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor(); baseAccessor.remove(paths, 0); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/558b42c6/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java index c65cb44..7041c5e 100644 --- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java @@ -129,7 +129,7 @@ public class ResourceAccessor { * @param resourceId * @param configuration */ - void setConfiguration(ResourceId resourceId, ResourceConfiguration configuration) { + private void setConfiguration(ResourceId resourceId, ResourceConfiguration configuration) { _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration); // also set an ideal state if the resource supports it RebalancerConfig rebalancerConfig = new RebalancerConfig(configuration); @@ -278,7 +278,7 @@ public class ResourceAccessor { * @return true if they were reset, false otherwise */ public boolean resetResources(Set<ResourceId> resetResourceIdSet) { - ParticipantAccessor accessor = new ParticipantAccessor(_accessor); + ParticipantAccessor accessor = participantAccessor(); List<ExternalView> extViews = _accessor.getChildValues(_keyBuilder.externalViews()); for (ExternalView extView : extViews) { if (!resetResourceIdSet.contains(extView.getResourceId())) { @@ -436,4 +436,12 @@ public class ResourceAccessor { return new Resource(resourceId, type, idealState, resourceAssignment, externalView, rebalancerContext, userConfig, bucketSize, batchMessageMode); } + + /** + * Get a ParticipantAccessor instance + * @return ParticipantAccessor + */ + protected ParticipantAccessor participantAccessor() { + return new ParticipantAccessor(_accessor); + } }
