Updated Branches: refs/heads/helix-logical-model cb3051241 -> b40608916
[HELIX-268] Atomic API for cluster, resource, participant Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/b4060891 Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/b4060891 Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/b4060891 Branch: refs/heads/helix-logical-model Commit: b40608916362be5a72698f15cb00ba3e1bbb800b Parents: cb30512 Author: Kanak Biscuitwala <[email protected]> Authored: Tue Oct 8 14:12:59 2013 -0700 Committer: Kanak Biscuitwala <[email protected]> Committed: Tue Oct 8 14:12:59 2013 -0700 ---------------------------------------------------------------------- .../api/accessor/AtomicClusterAccessor.java | 63 +++-- .../api/accessor/AtomicParticipantAccessor.java | 232 +++++++++++++++++++ .../api/accessor/AtomicResourceAccessor.java | 133 +++++++++++ .../helix/api/accessor/ClusterAccessor.java | 7 +- .../helix/api/accessor/ParticipantAccessor.java | 17 +- .../org/apache/helix/lock/zk/ZKHelixLock.java | 15 +- 6 files changed, 439 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/b4060891/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java index ff8ab6e..a2af79b 100644 --- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java @@ -1,19 +1,5 @@ package org.apache.helix.api.accessor; -import org.apache.helix.HelixDataAccessor; -import org.apache.helix.PropertyKey; -import org.apache.helix.api.Cluster; -import org.apache.helix.api.Scope; -import org.apache.helix.api.config.ClusterConfig; -import org.apache.helix.api.config.ParticipantConfig; -import org.apache.helix.api.config.ResourceConfig; -import org.apache.helix.api.id.ClusterId; -import org.apache.helix.api.id.ParticipantId; -import org.apache.helix.api.id.ResourceId; -import org.apache.helix.lock.HelixLock; -import org.apache.helix.lock.HelixLockable; -import org.apache.log4j.Logger; - /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -33,6 +19,27 @@ import org.apache.log4j.Logger; * under the License. */ +import java.util.Map; +import java.util.Set; + +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.PropertyKey; +import org.apache.helix.api.Cluster; +import org.apache.helix.api.Resource; +import org.apache.helix.api.Scope; +import org.apache.helix.api.config.ClusterConfig; +import org.apache.helix.api.config.ParticipantConfig; +import org.apache.helix.api.config.ResourceConfig; +import org.apache.helix.api.id.ClusterId; +import org.apache.helix.api.id.ParticipantId; +import org.apache.helix.api.id.ResourceId; +import org.apache.helix.lock.HelixLock; +import org.apache.helix.lock.HelixLockable; +import org.apache.log4j.Logger; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + /** * An atomic version of the ClusterAccessor. If atomic operations are required, use instances of * this class. Atomicity is not guaranteed when using instances of ClusterAccessor alongside @@ -43,9 +50,7 @@ public class AtomicClusterAccessor extends ClusterAccessor { private static final Logger LOG = Logger.getLogger(AtomicClusterAccessor.class); private final HelixLockable _lockProvider; - @SuppressWarnings("unused") private final HelixDataAccessor _accessor; - @SuppressWarnings("unused") private final PropertyKey.Builder _keyBuilder; private final ClusterId _clusterId; @@ -182,7 +187,7 @@ public class AtomicClusterAccessor extends ClusterAccessor { return null; } ClusterConfig config = clusterDelta.mergeInto(cluster.getConfig()); - boolean status = setBasicClusterConfig(config); + boolean status = super.setBasicClusterConfig(config); return status ? config : null; } finally { lock.unlock(); @@ -190,4 +195,28 @@ public class AtomicClusterAccessor extends ClusterAccessor { } return null; } + + /** + * Read resources atomically. This is resource-atomic, not cluster-atomic + */ + @Override + public Map<ResourceId, Resource> readResources() { + // read resources individually instead of together to maintain the equality link between ideal + // state and resource config + Map<ResourceId, Resource> resources = Maps.newHashMap(); + Set<String> idealStateNames = + Sets.newHashSet(_accessor.getChildNames(_keyBuilder.idealStates())); + Set<String> resourceConfigNames = + Sets.newHashSet(_accessor.getChildNames(_keyBuilder.resourceConfigs())); + resourceConfigNames.addAll(idealStateNames); + ResourceAccessor accessor = new AtomicResourceAccessor(_clusterId, _accessor, _lockProvider); + for (String resourceName : resourceConfigNames) { + ResourceId resourceId = ResourceId.from(resourceName); + Resource resource = accessor.readResource(resourceId); + if (resource != null) { + resources.put(resourceId, resource); + } + } + return resources; + } } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/b4060891/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java new file mode 100644 index 0000000..8482208 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java @@ -0,0 +1,232 @@ +package org.apache.helix.api.accessor; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.PropertyKey; +import org.apache.helix.api.Participant; +import org.apache.helix.api.Scope; +import org.apache.helix.api.config.ParticipantConfig; +import org.apache.helix.api.id.ClusterId; +import org.apache.helix.api.id.MessageId; +import org.apache.helix.api.id.ParticipantId; +import org.apache.helix.api.id.ResourceId; +import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext; +import org.apache.helix.lock.HelixLock; +import org.apache.helix.lock.HelixLockable; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.Message; +import org.apache.log4j.Logger; + +/** + * An atomic version of the ParticipantAccessor. If atomic operations are required, use instances of + * this class. Atomicity is not guaranteed when using instances of ParticipantAccessor alongside + * instances of this class. Furthermore, depending on the semantics of the lock, lock acquisition + * may fail, in which case users should handle the return value of each function if necessary. + */ +public class AtomicParticipantAccessor extends ParticipantAccessor { + private static final Logger LOG = Logger.getLogger(AtomicParticipantAccessor.class); + + private final ClusterId _clusterId; + private final HelixDataAccessor _accessor; + private final PropertyKey.Builder _keyBuilder; + private final HelixLockable _lockProvider; + + public AtomicParticipantAccessor(ClusterId clusterId, HelixDataAccessor accessor, + HelixLockable lockProvider) { + super(accessor); + _clusterId = clusterId; + _accessor = accessor; + _keyBuilder = accessor.keyBuilder(); + _lockProvider = lockProvider; + } + + @Override + public Participant readParticipant(ParticipantId participantId) { + HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId)); + boolean locked = lock.lock(); + if (locked) { + try { + return super.readParticipant(participantId); + } finally { + lock.unlock(); + } + } + return null; + } + + @Override + public boolean setParticipant(ParticipantConfig participantConfig) { + if (participantConfig == null) { + LOG.error("participant config cannot be null"); + return false; + } + HelixLock lock = + _lockProvider.getLock(_clusterId, Scope.participant(participantConfig.getId())); + boolean locked = lock.lock(); + if (locked) { + try { + return super.setParticipant(participantConfig); + } finally { + lock.unlock(); + } + } + return false; + } + + @Override + public ParticipantConfig updateParticipant(ParticipantId participantId, + ParticipantConfig.Delta participantDelta) { + HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId)); + boolean locked = lock.lock(); + if (locked) { + try { + Participant participant = super.readParticipant(participantId); + if (participant == null) { + LOG.error("Participant " + participantId + " does not exist, cannot be updated"); + return null; + } + ParticipantConfig config = participantDelta.mergeInto(participant.getConfig()); + super.setParticipant(config); + return config; + } finally { + lock.unlock(); + } + } + return null; + } + + /** + * Swap a new participant in to serve the replicas of an old (dead) one. The atomicity scope is + * participant-local and resource-local. + */ + @Override + public boolean swapParticipants(ParticipantId oldParticipantId, ParticipantId newParticipantId) { + Participant oldParticipant = readParticipant(oldParticipantId); + if (oldParticipant == null) { + LOG.error("Could not swap participants because the old participant does not exist"); + return false; + } + if (oldParticipant.isEnabled()) { + LOG.error("Could not swap participants because the old participant is still enabled"); + return false; + } + if (oldParticipant.isAlive()) { + LOG.error("Could not swap participants because the old participant is still live"); + return false; + } + Participant newParticipant = readParticipant(newParticipantId); + if (newParticipant == null) { + LOG.error("Could not swap participants because the new participant does not exist"); + return false; + } + dropParticipant(oldParticipantId); + ResourceAccessor resourceAccessor = new ResourceAccessor(_accessor); + List<String> idealStates = _accessor.getChildNames(_keyBuilder.idealStates()); + for (String resourceName : idealStates) { + HelixLock lock = + _lockProvider.getLock(_clusterId, Scope.resource(ResourceId.from(resourceName))); + boolean locked = lock.lock(); + if (locked) { + try { + // lock the resource for all ideal state reads and updates + IdealState idealState = _accessor.getProperty(_keyBuilder.idealState(resourceName)); + if (idealState != null) { + swapParticipantsInIdealState(idealState, oldParticipantId, newParticipantId); + PartitionedRebalancerContext context = PartitionedRebalancerContext.from(idealState); + resourceAccessor.setRebalancerContext(ResourceId.from(resourceName), context); + _accessor.setProperty(_keyBuilder.idealState(resourceName), idealState); + } + } finally { + lock.unlock(); + } + } else { + return false; + } + } + return true; + } + + @Override + boolean dropParticipant(ParticipantId participantId) { + HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId)); + boolean locked = lock.lock(); + if (locked) { + try { + return super.dropParticipant(participantId); + } finally { + lock.unlock(); + } + } + return false; + } + + @Override + public void insertMessagesToParticipant(ParticipantId participantId, + Map<MessageId, Message> msgMap) { + HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId)); + boolean locked = lock.lock(); + if (locked) { + try { + super.insertMessagesToParticipant(participantId, msgMap); + } finally { + lock.unlock(); + } + } + return; + } + + @Override + public void updateMessageStatus(ParticipantId participantId, Map<MessageId, Message> msgMap) { + HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId)); + boolean locked = lock.lock(); + if (locked) { + try { + super.updateMessageStatus(participantId, msgMap); + } finally { + lock.unlock(); + } + } + return; + } + + @Override + public void deleteMessagesFromParticipant(ParticipantId participantId, Set<MessageId> msgIdSet) { + HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId)); + boolean locked = lock.lock(); + if (locked) { + try { + super.deleteMessagesFromParticipant(participantId, msgIdSet); + } finally { + lock.unlock(); + } + } + return; + } + + @Override + protected ResourceAccessor resourceAccessor() { + return new AtomicResourceAccessor(_clusterId, _accessor, _lockProvider); + } +} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/b4060891/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java new file mode 100644 index 0000000..4ff50d7 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java @@ -0,0 +1,133 @@ +package org.apache.helix.api.accessor; + +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.api.Resource; +import org.apache.helix.api.Scope; +import org.apache.helix.api.config.ResourceConfig; +import org.apache.helix.api.id.ClusterId; +import org.apache.helix.api.id.ResourceId; +import org.apache.helix.controller.rebalancer.context.RebalancerContext; +import org.apache.helix.lock.HelixLock; +import org.apache.helix.lock.HelixLockable; +import org.apache.log4j.Logger; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * An atomic version of the ResourceAccessor. If atomic operations are required, use instances of + * this class. Atomicity is not guaranteed when using instances of ResourceAccessor alongside + * instances of this class. Furthermore, depending on the semantics of the lock, lock acquisition + * may fail, in which case users should handle the return value of each function if necessary. + */ +public class AtomicResourceAccessor extends ResourceAccessor { + private static final Logger LOG = Logger.getLogger(AtomicResourceAccessor.class); + + private final ClusterId _clusterId; + private final HelixLockable _lockProvider; + + public AtomicResourceAccessor(ClusterId clusterId, HelixDataAccessor accessor, + HelixLockable lockProvider) { + super(accessor); + _clusterId = clusterId; + _lockProvider = lockProvider; + } + + @Override + public Resource readResource(ResourceId resourceId) { + HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceId)); + boolean locked = lock.lock(); + if (locked) { + try { + return super.readResource(resourceId); + } finally { + lock.unlock(); + } + } + return null; + } + + @Override + public ResourceConfig updateResource(ResourceId resourceId, ResourceConfig.Delta resourceDelta) { + HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceId)); + boolean locked = lock.lock(); + if (locked) { + try { + Resource resource = super.readResource(resourceId); + if (resource == null) { + LOG.error("Resource " + resourceId + " does not exist, cannot be updated"); + return null; + } + ResourceConfig config = resourceDelta.mergeInto(resource.getConfig()); + super.setResource(config); + return config; + } finally { + lock.unlock(); + } + } + return null; + } + + @Override + public boolean setRebalancerContext(ResourceId resourceId, RebalancerContext context) { + HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceId)); + boolean locked = lock.lock(); + if (locked) { + try { + return super.setRebalancerContext(resourceId, context); + } finally { + lock.unlock(); + } + } + return false; + } + + @Override + public boolean setResource(ResourceConfig resourceConfig) { + if (resourceConfig == null) { + LOG.error("resource config cannot be null"); + return false; + } + HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceConfig.getId())); + boolean locked = lock.lock(); + if (locked) { + try { + return super.setResource(resourceConfig); + } finally { + lock.unlock(); + } + } + return false; + } + + @Override + public boolean generateDefaultAssignment(ResourceId resourceId, int replicaCount, + String participantGroupTag) { + HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId)); + boolean locked = lock.lock(); + if (locked) { + try { + return super.generateDefaultAssignment(resourceId, replicaCount, participantGroupTag); + } finally { + lock.unlock(); + } + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/b4060891/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java index 3548c82..bec7308 100644 --- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java @@ -81,6 +81,11 @@ public class ClusterAccessor { private final PropertyKey.Builder _keyBuilder; private final ClusterId _clusterId; + /** + * Instantiate a cluster accessor + * @param clusterId the cluster to access + * @param accessor HelixDataAccessor for the physical store + */ public ClusterAccessor(ClusterId clusterId, HelixDataAccessor accessor) { _accessor = accessor; _keyBuilder = accessor.keyBuilder(); @@ -281,7 +286,7 @@ public class ClusterAccessor { } /** - * Read all resource in the cluster + * Read all resources in the cluster * @return map of resource id to resource */ public Map<ResourceId, Resource> readResources() { http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/b4060891/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java index 7952761..50945aa 100644 --- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java @@ -205,9 +205,6 @@ public class ParticipantAccessor { } } - // TODO merge list logic should go to znrecord updater - // update participantConfig - // could not use ZNRecordUpdater since it doesn't do listField merge/subtract BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor(); final List<String> partitionNames = new ArrayList<String>(); for (PartitionId partitionId : partitionIdSet) { @@ -306,7 +303,7 @@ public class ParticipantAccessor { RunningInstance runningInstance = participant.getRunningInstance(); // check that the resource exists - ResourceAccessor resourceAccessor = new ResourceAccessor(_accessor); + ResourceAccessor resourceAccessor = resourceAccessor(); Resource resource = resourceAccessor.readResource(resourceId); if (resource == null || resource.getRebalancerConfig() == null) { LOG.error("Cannot reset partitions because the resource is not present"); @@ -676,8 +673,8 @@ public class ParticipantAccessor { * @param oldParticipantId the participant to drop * @param newParticipantId the participant that replaces it */ - private void swapParticipantsInIdealState(IdealState idealState, ParticipantId oldParticipantId, - ParticipantId newParticipantId) { + protected void swapParticipantsInIdealState(IdealState idealState, + ParticipantId oldParticipantId, ParticipantId newParticipantId) { for (PartitionId partitionId : idealState.getPartitionSet()) { List<ParticipantId> oldPreferenceList = idealState.getPreferenceList(partitionId); if (oldPreferenceList != null) { @@ -702,4 +699,12 @@ public class ParticipantAccessor { } } } + + /** + * Get a ResourceAccessor instance + * @return ResourceAccessor + */ + protected ResourceAccessor resourceAccessor() { + return new ResourceAccessor(_accessor); + } } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/b4060891/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java index 6f09c5e..7ddbce1 100644 --- a/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java +++ b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java @@ -34,6 +34,7 @@ import org.apache.zookeeper.ZooKeeper; /** * Locking scheme for Helix that uses the ZooKeeper exclusive lock implementation + * Please use the following lock order convention: Cluster, Participant, Resource, Partition */ public class ZKHelixLock implements HelixLock { private static final Logger LOG = Logger.getLogger(ZKHelixLock.class); @@ -42,8 +43,8 @@ public class ZKHelixLock implements HelixLock { private final String _rootPath; private final WriteLock _writeLock; private final ZkClient _zkClient; - private boolean _locked; - private boolean _canceled; + private volatile boolean _locked; + private volatile boolean _canceled; private final LockListener _listener = new LockListener() { @Override @@ -55,10 +56,10 @@ public class ZKHelixLock implements HelixLock { synchronized (ZKHelixLock.this) { if (!_canceled) { _locked = true; - ZKHelixLock.this.notify(); } else { unlock(); } + ZKHelixLock.this.notify(); } } }; @@ -115,7 +116,13 @@ public class ZKHelixLock implements HelixLock { * @return true if unlock executed, false otherwise */ public synchronized boolean unlock() { - _writeLock.unlock(); + try { + _writeLock.unlock(); + } catch (IllegalArgumentException e) { + if (LOG.isInfoEnabled()) { + LOG.info("Unlock skipped because lock node was not present"); + } + } _locked = false; return true; }
