Updated Branches: refs/heads/helix-logical-model 48a5f48f3 -> cb3051241
[HELIX-268] Atomic API - Add a ZK lock and a skeleton AtomicClusterAccessor Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/cb305124 Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/cb305124 Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/cb305124 Branch: refs/heads/helix-logical-model Commit: cb3051241d99ccb68674e5b37feba133fe2d5286 Parents: 48a5f48 Author: Kanak Biscuitwala <[email protected]> Authored: Mon Oct 7 18:04:51 2013 -0700 Committer: Kanak Biscuitwala <[email protected]> Committed: Mon Oct 7 18:04:51 2013 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/helix/api/Scope.java | 8 + .../api/accessor/AtomicClusterAccessor.java | 193 ++++++++++++ .../helix/api/accessor/ClusterAccessor.java | 54 ++-- .../helix/api/accessor/ParticipantAccessor.java | 2 - .../java/org/apache/helix/lock/HelixLock.java | 37 +++ .../org/apache/helix/lock/HelixLockable.java | 36 +++ .../org/apache/helix/lock/zk/LockListener.java | 39 +++ .../apache/helix/lock/zk/ProtocolSupport.java | 191 ++++++++++++ .../org/apache/helix/lock/zk/WriteLock.java | 294 +++++++++++++++++++ .../org/apache/helix/lock/zk/ZKHelixLock.java | 167 +++++++++++ .../org/apache/helix/lock/zk/ZNodeName.java | 113 +++++++ .../helix/lock/zk/ZooKeeperOperation.java | 38 +++ 12 files changed, 1150 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/cb305124/helix-core/src/main/java/org/apache/helix/api/Scope.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/Scope.java b/helix-core/src/main/java/org/apache/helix/api/Scope.java index 7dc217c..26e09a9 100644 --- a/helix-core/src/main/java/org/apache/helix/api/Scope.java +++ b/helix-core/src/main/java/org/apache/helix/api/Scope.java @@ -60,6 +60,14 @@ public class Scope<T extends Id> { return getType() + "{" + getScopedId() + "}"; } + @Override + public boolean equals(Object that) { + if (that instanceof Scope) { + return this.toString().equals(that.toString()); + } + return false; + } + /** * Get the Helix entity type that this scope covers * @return scope type http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/cb305124/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java new file mode 100644 index 0000000..ff8ab6e --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java @@ -0,0 +1,193 @@ +package org.apache.helix.api.accessor; + +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.PropertyKey; +import org.apache.helix.api.Cluster; +import org.apache.helix.api.Scope; +import org.apache.helix.api.config.ClusterConfig; +import org.apache.helix.api.config.ParticipantConfig; +import org.apache.helix.api.config.ResourceConfig; +import org.apache.helix.api.id.ClusterId; +import org.apache.helix.api.id.ParticipantId; +import org.apache.helix.api.id.ResourceId; +import org.apache.helix.lock.HelixLock; +import org.apache.helix.lock.HelixLockable; +import org.apache.log4j.Logger; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * An atomic version of the ClusterAccessor. If atomic operations are required, use instances of + * this class. Atomicity is not guaranteed when using instances of ClusterAccessor alongside + * instances of this class. Furthermore, depending on the semantics of the lock, lock acquisition + * may fail, in which case users should handle the return value of each function if necessary. + */ +public class AtomicClusterAccessor extends ClusterAccessor { + private static final Logger LOG = Logger.getLogger(AtomicClusterAccessor.class); + + private final HelixLockable _lockProvider; + @SuppressWarnings("unused") + private final HelixDataAccessor _accessor; + @SuppressWarnings("unused") + private final PropertyKey.Builder _keyBuilder; + private final ClusterId _clusterId; + + /** + * Instantiate the accessor + * @param clusterId the cluster to access + * @param accessor a HelixDataAccessor for the physical properties + * @param lockProvider a lock provider + */ + public AtomicClusterAccessor(ClusterId clusterId, HelixDataAccessor accessor, + HelixLockable lockProvider) { + super(clusterId, accessor); + _lockProvider = lockProvider; + _accessor = accessor; + _keyBuilder = accessor.keyBuilder(); + _clusterId = clusterId; + } + + @Override + public boolean createCluster(ClusterConfig cluster) { + HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId)); + boolean locked = lock.lock(); + if (locked) { + try { + return super.createCluster(cluster); + } finally { + lock.unlock(); + } + } + return false; + } + + @Override + public boolean dropCluster() { + HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId)); + boolean locked = lock.lock(); + if (locked) { + try { + return super.dropCluster(); + } finally { + lock.unlock(); + } + } + return false; + } + + @Override + public Cluster readCluster() { + HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId)); + boolean locked = lock.lock(); + if (locked) { + try { + return super.readCluster(); + } finally { + lock.unlock(); + } + } + return null; + } + + @Override + public boolean addParticipantToCluster(ParticipantConfig participant) { + if (participant == null) { + LOG.error("Participant config cannot be null"); + return false; + } + HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participant.getId())); + boolean locked = lock.lock(); + if (locked) { + try { + return super.addParticipantToCluster(participant); + } finally { + lock.unlock(); + } + } + return false; + } + + @Override + public boolean dropParticipantFromCluster(ParticipantId participantId) { + HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId)); + boolean locked = lock.lock(); + if (locked) { + try { + return super.dropParticipantFromCluster(participantId); + } finally { + lock.unlock(); + } + } + return false; + } + + @Override + public boolean addResourceToCluster(ResourceConfig resource) { + if (resource == null) { + LOG.error("Resource config cannot be null"); + return false; + } + HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resource.getId())); + boolean locked = lock.lock(); + if (locked) { + try { + return super.addResourceToCluster(resource); + } finally { + lock.unlock(); + } + } + return false; + } + + @Override + public boolean dropResourceFromCluster(ResourceId resourceId) { + HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceId)); + boolean locked = lock.lock(); + if (locked) { + try { + return super.dropResourceFromCluster(resourceId); + } finally { + lock.unlock(); + } + } + return false; + } + + @Override + public ClusterConfig updateCluster(ClusterConfig.Delta clusterDelta) { + HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId)); + boolean locked = lock.lock(); + if (locked) { + try { + Cluster cluster = super.readCluster(); + if (cluster == null) { + LOG.error("Cluster does not exist, cannot be updated"); + return null; + } + ClusterConfig config = clusterDelta.mergeInto(cluster.getConfig()); + boolean status = setBasicClusterConfig(config); + return status ? config : null; + } finally { + lock.unlock(); + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/cb305124/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java index 8780115..3548c82 100644 --- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java @@ -20,6 +20,7 @@ package org.apache.helix.api.accessor; */ import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -91,11 +92,12 @@ public class ClusterAccessor { * @return true if created, false if creation failed */ public boolean createCluster(ClusterConfig cluster) { - boolean created = _accessor.createProperty(_keyBuilder.cluster(), null); - if (!created) { + ClusterConfiguration configuration = _accessor.getProperty(_keyBuilder.clusterConfig()); + if (configuration != null && isClusterStructureValid()) { LOG.error("Cluster already created. Aborting."); return false; } + clearClusterStructure(); initClusterStructure(); Map<StateModelDefId, StateModelDefinition> stateModelDefs = cluster.getStateModelMap(); for (StateModelDefinition stateModelDef : stateModelDefs.values()) { @@ -121,10 +123,10 @@ public class ClusterAccessor { if (cluster.getStats() != null && !cluster.getStats().getMapFields().isEmpty()) { _accessor.createProperty(_keyBuilder.persistantStat(), cluster.getStats()); } - _accessor.createProperty(_keyBuilder.clusterConfig(), clusterConfig); if (cluster.isPaused()) { pauseCluster(); } + _accessor.createProperty(_keyBuilder.clusterConfig(), clusterConfig); return true; } @@ -150,7 +152,7 @@ public class ClusterAccessor { * @param config ClusterConfig * @return true if correctly set, false otherwise */ - private boolean setBasicClusterConfig(ClusterConfig config) { + protected boolean setBasicClusterConfig(ClusterConfig config) { if (config == null) { return false; } @@ -199,9 +201,13 @@ public class ClusterAccessor { /** * read entire cluster data - * @return cluster snapshot + * @return cluster snapshot or null */ public Cluster readCluster() { + if (!isClusterStructureValid()) { + LOG.error("Cluster is not fully set up"); + return null; + } LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader()); /** @@ -279,6 +285,11 @@ public class ClusterAccessor { * @return map of resource id to resource */ public Map<ResourceId, Resource> readResources() { + if (!isClusterStructureValid()) { + LOG.error("Cluster is not fully set up yet!"); + return Collections.emptyMap(); + } + /** * map of resource-id to ideal-state */ @@ -319,9 +330,14 @@ public class ClusterAccessor { /** * Read all participants in the cluster - * @return map of participant id to participant + * @return map of participant id to participant, or empty map */ public Map<ParticipantId, Participant> readParticipants() { + if (!isClusterStructureValid()) { + LOG.error("Cluster is not fully set up yet!"); + return Collections.emptyMap(); + } + /** * map of instance-id to instance-config */ @@ -667,18 +683,8 @@ public class ClusterAccessor { * @return true if valid or false otherwise */ public boolean isClusterStructureValid() { - return isClusterStructureValid(_clusterId, _accessor.getBaseDataAccessor()); - } - - /** - * check if cluster structure is valid - * @param clusterId the cluster to check - * @param baseAccessor a base data accessor - * @return true if valid or false otherwise - */ - private static boolean isClusterStructureValid(ClusterId clusterId, - BaseDataAccessor<?> baseAccessor) { - List<String> paths = getRequiredPaths(clusterId); + List<String> paths = getRequiredPaths(_clusterId); + BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor(); if (baseAccessor != null) { boolean[] existsResults = baseAccessor.exists(paths, 0); for (boolean exists : existsResults) { @@ -693,7 +699,7 @@ public class ClusterAccessor { /** * Create empty persistent properties to ensure that there is a valid cluster structure */ - private void initClusterStructure() { + public void initClusterStructure() { BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor(); List<String> paths = getRequiredPaths(_clusterId); for (String path : paths) { @@ -705,6 +711,15 @@ public class ClusterAccessor { } /** + * Remove all but the top level cluster node; intended for reconstructing the cluster + */ + private void clearClusterStructure() { + BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor(); + List<String> paths = getRequiredPaths(_clusterId); + baseAccessor.remove(paths, 0); + } + + /** * Get all property paths that must be set for a cluster structure to be valid * @param the cluster that the paths will be relative to * @return list of paths as strings @@ -712,7 +727,6 @@ public class ClusterAccessor { private static List<String> getRequiredPaths(ClusterId clusterId) { PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterId.stringify()); List<String> paths = new ArrayList<String>(); - paths.add(keyBuilder.cluster().getPath()); paths.add(keyBuilder.clusterConfigs().getPath()); paths.add(keyBuilder.instanceConfigs().getPath()); paths.add(keyBuilder.propertyStore().getPath()); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/cb305124/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java index c53bcd8..7952761 100644 --- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java @@ -618,12 +618,10 @@ public class ParticipantAccessor { boolean dropParticipant(ParticipantId participantId) { if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) == null) { LOG.error("Config for participant: " + participantId + " does NOT exist in cluster"); - return false; } if (_accessor.getProperty(_keyBuilder.instance(participantId.stringify())) == null) { LOG.error("Participant: " + participantId + " structure does NOT exist in cluster"); - return false; } // delete participant config path http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/cb305124/helix-core/src/main/java/org/apache/helix/lock/HelixLock.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/lock/HelixLock.java b/helix-core/src/main/java/org/apache/helix/lock/HelixLock.java new file mode 100644 index 0000000..79c15d0 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/lock/HelixLock.java @@ -0,0 +1,37 @@ +package org.apache.helix.lock; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Generic (distributed) lock for Helix-related persisted updates + */ +public interface HelixLock { + /** + * Synchronously acquire a lock + * @return true if the lock was acquired, false if could not be acquired + */ + public boolean lock(); + + /** + * Release a lock + * @return true if the lock was released, false if it could not be released + */ + public boolean unlock(); +} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/cb305124/helix-core/src/main/java/org/apache/helix/lock/HelixLockable.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/lock/HelixLockable.java b/helix-core/src/main/java/org/apache/helix/lock/HelixLockable.java new file mode 100644 index 0000000..fdb2ca5 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/lock/HelixLockable.java @@ -0,0 +1,36 @@ +package org.apache.helix.lock; + +import org.apache.helix.api.Scope; +import org.apache.helix.api.id.ClusterId; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Implemented by any Helix construct that is lockable and is able to return a HelixLock instance + */ +public interface HelixLockable { + /** + * Get a lock object on a scope + * @param clusterId cluster to lock + * @param scope scope relative to the cluster that the lock protects + * @return HelixLock instance + */ + HelixLock getLock(ClusterId clusterId, Scope<?> scope); +} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/cb305124/helix-core/src/main/java/org/apache/helix/lock/zk/LockListener.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/LockListener.java b/helix-core/src/main/java/org/apache/helix/lock/zk/LockListener.java new file mode 100644 index 0000000..bb2118c --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/lock/zk/LockListener.java @@ -0,0 +1,39 @@ +package org.apache.helix.lock.zk; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * This class has two methods which are call + * back methods when a lock is acquired and + * when the lock is released. + */ +interface LockListener { + /** + * call back called when the lock + * is acquired + */ + public void lockAcquired(); + + /** + * call back called when the lock is + * released. + */ + public void lockReleased(); +} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/cb305124/helix-core/src/main/java/org/apache/helix/lock/zk/ProtocolSupport.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/ProtocolSupport.java b/helix-core/src/main/java/org/apache/helix/lock/zk/ProtocolSupport.java new file mode 100644 index 0000000..23bef6a --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/lock/zk/ProtocolSupport.java @@ -0,0 +1,191 @@ +package org.apache.helix.lock.zk; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * A base class for protocol implementations which provides a number of higher + * level helper methods for working with ZooKeeper along with retrying synchronous + * operations if the connection to ZooKeeper closes such as + * {@link #retryOperation(ZooKeeperOperation)} + */ +class ProtocolSupport { + private static final Logger LOG = Logger.getLogger(ProtocolSupport.class); + + protected final ZooKeeper zookeeper; + private AtomicBoolean closed = new AtomicBoolean(false); + private long retryDelay = 500L; + private int retryCount = 10; + private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; + + public ProtocolSupport(ZooKeeper zookeeper) { + this.zookeeper = zookeeper; + } + + /** + * Closes this strategy and releases any ZooKeeper resources; but keeps the + * ZooKeeper instance open + */ + public void close() { + if (closed.compareAndSet(false, true)) { + doClose(); + } + } + + /** + * return zookeeper client instance + * @return zookeeper client instance + */ + public ZooKeeper getZookeeper() { + return zookeeper; + } + + /** + * return the acl its using + * @return the acl. + */ + public List<ACL> getAcl() { + return acl; + } + + /** + * set the acl + * @param acl the acl to set to + */ + public void setAcl(List<ACL> acl) { + this.acl = acl; + } + + /** + * get the retry delay in milliseconds + * @return the retry delay + */ + public long getRetryDelay() { + return retryDelay; + } + + /** + * Sets the time waited between retry delays + * @param retryDelay the retry delay + */ + public void setRetryDelay(long retryDelay) { + this.retryDelay = retryDelay; + } + + /** + * Allow derived classes to perform + * some custom closing operations to release resources + */ + protected void doClose() { + } + + /** + * Perform the given operation, retrying if the connection fails + * @return object. it needs to be cast to the callee's expected + * return type. + */ + protected Object retryOperation(ZooKeeperOperation operation) throws KeeperException, + InterruptedException { + KeeperException exception = null; + for (int i = 0; i < retryCount; i++) { + try { + return operation.execute(); + } catch (KeeperException.SessionExpiredException e) { + LOG.warn("Session expired for: " + zookeeper + " so reconnecting due to: " + e, e); + throw e; + } catch (KeeperException.ConnectionLossException e) { + if (exception == null) { + exception = e; + } + LOG.debug("Attempt " + i + " failed with connection loss so " + "attempting to reconnect: " + + e, e); + retryDelay(i); + } + } + throw exception; + } + + /** + * Ensures that the given path exists with no data, the current + * ACL and no flags + * @param path + */ + protected void ensurePathExists(String path) { + ensureExists(path, null, acl, CreateMode.PERSISTENT); + } + + /** + * Ensures that the given path exists with the given data, ACL and flags + * @param path + * @param acl + * @param flags + */ + protected void ensureExists(final String path, final byte[] data, final List<ACL> acl, + final CreateMode flags) { + try { + retryOperation(new ZooKeeperOperation() { + public boolean execute() throws KeeperException, InterruptedException { + Stat stat = zookeeper.exists(path, false); + if (stat != null) { + return true; + } + zookeeper.create(path, data, acl, flags); + return true; + } + }); + } catch (KeeperException e) { + LOG.warn("Caught: " + e, e); + } catch (InterruptedException e) { + LOG.warn("Caught: " + e, e); + } + } + + /** + * Returns true if this protocol has been closed + * @return true if this protocol is closed + */ + protected boolean isClosed() { + return closed.get(); + } + + /** + * Performs a retry delay if this is not the first attempt + * @param attemptCount the number of the attempts performed so far + */ + protected void retryDelay(int attemptCount) { + if (attemptCount > 0) { + try { + Thread.sleep(attemptCount * retryDelay); + } catch (InterruptedException e) { + LOG.debug("Failed to sleep: " + e, e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/cb305124/helix-core/src/main/java/org/apache/helix/lock/zk/WriteLock.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/WriteLock.java b/helix-core/src/main/java/org/apache/helix/lock/zk/WriteLock.java new file mode 100644 index 0000000..aef7618 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/lock/zk/WriteLock.java @@ -0,0 +1,294 @@ +package org.apache.helix.lock.zk; + +import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL; + +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * A protocol to implement an exclusive + * write lock or to elect a leader. + * <p/> + * You invoke {@link #lock()} to start the process of grabbing the lock; you may get the lock then + * or it may be some time later. + * <p/> + * You can register a listener so that you are invoked when you get the lock; otherwise you can ask + * if you have the lock by calling {@link #isOwner()} + */ +class WriteLock extends ProtocolSupport { + private static final Logger LOG = Logger.getLogger(WriteLock.class); + + private final String dir; + private String id; + private ZNodeName idName; + private String ownerId; + private String lastChildId; + private byte[] data = { + 0x12, 0x34 + }; + private LockListener callback; + private LockZooKeeperOperation zop; + + /** + * zookeeper contructor for writelock + * @param zookeeper zookeeper client instance + * @param dir the parent path you want to use for locking + * @param acls the acls that you want to use for all the paths, + * if null world read/write is used. + */ + public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl) { + super(zookeeper); + this.dir = dir; + if (acl != null) { + setAcl(acl); + } + this.zop = new LockZooKeeperOperation(); + } + + /** + * zookeeper contructor for writelock with callback + * @param zookeeper the zookeeper client instance + * @param dir the parent path you want to use for locking + * @param acl the acls that you want to use for all the paths + * @param callback the call back instance + */ + public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl, LockListener callback) { + this(zookeeper, dir, acl); + this.callback = callback; + } + + /** + * return the current locklistener + * @return the locklistener + */ + public LockListener getLockListener() { + return this.callback; + } + + /** + * register a different call back listener + * @param callback the call back instance + */ + public void setLockListener(LockListener callback) { + this.callback = callback; + } + + /** + * Removes the lock or associated znode if + * you no longer require the lock. this also + * removes your request in the queue for locking + * in case you do not already hold the lock. + * @throws RuntimeException throws a runtime exception + * if it cannot connect to zookeeper. + */ + public synchronized void unlock() throws RuntimeException { + + if (!isClosed() && id != null) { + // we don't need to retry this operation in the case of failure + // as ZK will remove ephemeral files and we don't wanna hang + // this process when closing if we cannot reconnect to ZK + try { + + ZooKeeperOperation zopdel = new ZooKeeperOperation() { + public boolean execute() throws KeeperException, InterruptedException { + zookeeper.delete(id, -1); + return Boolean.TRUE; + } + }; + zopdel.execute(); + } catch (InterruptedException e) { + LOG.warn("Caught: " + e, e); + // set that we have been interrupted. + Thread.currentThread().interrupt(); + } catch (KeeperException.NoNodeException e) { + // do nothing + } catch (KeeperException e) { + LOG.warn("Caught: " + e, e); + throw (RuntimeException) new RuntimeException(e.getMessage()).initCause(e); + } finally { + if (callback != null) { + callback.lockReleased(); + } + id = null; + } + } + } + + /** + * the watcher called on + * getting watch while watching + * my predecessor + */ + private class LockWatcher implements Watcher { + public void process(WatchedEvent event) { + // lets either become the leader or watch the new/updated node + LOG.debug("Watcher fired on path: " + event.getPath() + " state: " + event.getState() + + " type " + event.getType()); + try { + lock(); + } catch (Exception e) { + LOG.warn("Failed to acquire lock: " + e, e); + } + } + } + + /** + * a zoookeeper operation that is mainly responsible + * for all the magic required for locking. + */ + private class LockZooKeeperOperation implements ZooKeeperOperation { + + /** + * find if we have been created earler if not create our node + * @param prefix the prefix node + * @param zookeeper teh zookeeper client + * @param dir the dir paretn + * @throws KeeperException + * @throws InterruptedException + */ + private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir) + throws KeeperException, InterruptedException { + List<String> names = zookeeper.getChildren(dir, false); + for (String name : names) { + if (name.startsWith(prefix)) { + id = name; + if (LOG.isDebugEnabled()) { + LOG.debug("Found id created last time: " + id); + } + break; + } + } + if (id == null) { + id = zookeeper.create(dir + "/" + prefix, data, getAcl(), EPHEMERAL_SEQUENTIAL); + + if (LOG.isDebugEnabled()) { + LOG.debug("Created id: " + id); + } + } + + } + + /** + * the command that is run and retried for actually + * obtaining the lock + * @return if the command was successful or not + */ + public boolean execute() throws KeeperException, InterruptedException { + do { + if (id == null) { + long sessionId = zookeeper.getSessionId(); + String prefix = "x-" + sessionId + "-"; + // lets try look up the current ID if we failed + // in the middle of creating the znode + findPrefixInChildren(prefix, zookeeper, dir); + idName = new ZNodeName(id); + } + if (id != null) { + List<String> names = zookeeper.getChildren(dir, false); + if (names.isEmpty()) { + LOG.warn("No children in: " + dir + " when we've just " + + "created one! Lets recreate it..."); + // lets force the recreation of the id + id = null; + } else { + // lets sort them explicitly (though they do seem to come back in order ususally :) + SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>(); + for (String name : names) { + sortedNames.add(new ZNodeName(dir + "/" + name)); + } + ownerId = sortedNames.first().getName(); + SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName); + if (!lessThanMe.isEmpty()) { + ZNodeName lastChildName = lessThanMe.last(); + lastChildId = lastChildName.getName(); + if (LOG.isDebugEnabled()) { + LOG.debug("watching less than me node: " + lastChildId); + } + Stat stat = zookeeper.exists(lastChildId, new LockWatcher()); + if (stat != null) { + return Boolean.FALSE; + } else { + LOG.warn("Could not find the" + " stats for less than me: " + + lastChildName.getName()); + } + } else { + if (isOwner()) { + if (callback != null) { + callback.lockAcquired(); + } + return Boolean.TRUE; + } + } + } + } + } while (id == null); + return Boolean.FALSE; + } + }; + + /** + * Attempts to acquire the exclusive write lock returning whether or not it was + * acquired. Note that the exclusive lock may be acquired some time later after + * this method has been invoked due to the current lock owner going away. + */ + public synchronized boolean lock() throws KeeperException, InterruptedException { + if (isClosed()) { + return false; + } + ensurePathExists(dir); + + return (Boolean) retryOperation(zop); + } + + /** + * return the parent dir for lock + * @return the parent dir used for locks. + */ + public String getDir() { + return dir; + } + + /** + * Returns true if this node is the owner of the + * lock (or the leader) + */ + public boolean isOwner() { + return id != null && ownerId != null && id.equals(ownerId); + } + + /** + * return the id for this lock + * @return the id for this lock + */ + public String getId() { + return this.id; + } +} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/cb305124/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java new file mode 100644 index 0000000..6f09c5e --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java @@ -0,0 +1,167 @@ +package org.apache.helix.lock.zk; + +import org.I0Itec.zkclient.ZkConnection; +import org.apache.helix.AccessOption; +import org.apache.helix.BaseDataAccessor; +import org.apache.helix.ZNRecord; +import org.apache.helix.api.Scope; +import org.apache.helix.api.id.ClusterId; +import org.apache.helix.lock.HelixLock; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.manager.zk.ZkClient; +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeper; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Locking scheme for Helix that uses the ZooKeeper exclusive lock implementation + */ +public class ZKHelixLock implements HelixLock { + private static final Logger LOG = Logger.getLogger(ZKHelixLock.class); + + private static final String LOCK_ROOT = "LOCKS"; + private final String _rootPath; + private final WriteLock _writeLock; + private final ZkClient _zkClient; + private boolean _locked; + private boolean _canceled; + + private final LockListener _listener = new LockListener() { + @Override + public void lockReleased() { + } + + @Override + public void lockAcquired() { + synchronized (ZKHelixLock.this) { + if (!_canceled) { + _locked = true; + ZKHelixLock.this.notify(); + } else { + unlock(); + } + } + } + }; + + /** + * Initialize for a cluster and scope + * @param clusterId the cluster under which the lock will live + * @param scope the scope to lock + * @param zkClient an active ZK client + */ + public ZKHelixLock(ClusterId clusterId, Scope<?> scope, ZkClient zkClient) { + _zkClient = zkClient; + _rootPath = + '/' + clusterId.stringify() + '/' + LOCK_ROOT + '/' + scope.getType() + '_' + + scope.getScopedId(); + ZooKeeper zookeeper = ((ZkConnection) zkClient.getConnection()).getZookeeper(); + _writeLock = new WriteLock(zookeeper, _rootPath, null, _listener); + _locked = false; + _canceled = false; + } + + /** + * Try to synchronously lock the scope + * @return true if the lock succeeded, false if it failed, as is the case if the connection to ZK + * is lost + */ + public synchronized boolean lock() { + _canceled = false; + if (_locked) { + // no need to proceed if the lock is already acquired + return true; + } + try { + BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient); + baseAccessor.create(_rootPath, null, AccessOption.PERSISTENT); + boolean acquired = _writeLock.lock(); + if (acquired) { + _locked = true; + } else { + wait(); + } + } catch (KeeperException e) { + LOG.error("Error acquiring a lock on " + _rootPath, e); + _canceled = true; + } catch (InterruptedException e) { + LOG.error("Interrupted while acquiring a lock on " + _rootPath); + _canceled = true; + } + return _locked; + } + + /** + * Unlock the scope + * @return true if unlock executed, false otherwise + */ + public synchronized boolean unlock() { + _writeLock.unlock(); + _locked = false; + return true; + } + + public static void main(String[] args) { + ZkClient zkClient = new ZkClient("localhost:2199"); + ClusterId clusterId = ClusterId.from("exampleCluster"); + final ZKHelixLock lock1 = new ZKHelixLock(clusterId, Scope.cluster(clusterId), zkClient); + final ZKHelixLock lock2 = new ZKHelixLock(clusterId, Scope.cluster(clusterId), zkClient); + System.err.println("lock1 started"); + boolean result = lock1.lock(); + System.err.println("lock1 finished " + result); + new Thread() { + @Override + public void run() { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.err.println("unlock1 started"); + lock1.unlock(); + System.err.println("unlock1 finished"); + } + }.start(); + final Thread t1 = new Thread() { + @Override + public void run() { + System.err.println("lock2 started"); + boolean locked = lock2.lock(); + System.err.println("lock2 finished " + locked); + } + }; + t1.start(); + new Thread() { + @Override + public void run() { + try { + Thread.sleep(5000); + System.err.println("interrupt2 start"); + t1.interrupt(); + System.err.println("interrupt2 finished"); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }.start(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/cb305124/helix-core/src/main/java/org/apache/helix/lock/zk/ZNodeName.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/ZNodeName.java b/helix-core/src/main/java/org/apache/helix/lock/zk/ZNodeName.java new file mode 100644 index 0000000..47253e6 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/lock/zk/ZNodeName.java @@ -0,0 +1,113 @@ +package org.apache.helix.lock.zk; + +import org.apache.log4j.Logger; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Represents an ephemeral znode name which has an ordered sequence number + * and can be sorted in order + */ +class ZNodeName implements Comparable<ZNodeName> { + private final String name; + private String prefix; + private int sequence = -1; + private static final Logger LOG = Logger.getLogger(ZNodeName.class); + + public ZNodeName(String name) { + if (name == null) { + throw new NullPointerException("id cannot be null"); + } + this.name = name; + this.prefix = name; + int idx = name.lastIndexOf('-'); + if (idx >= 0) { + this.prefix = name.substring(0, idx); + try { + this.sequence = Integer.parseInt(name.substring(idx + 1)); + // If an exception occurred we misdetected a sequence suffix, + // so return -1. + } catch (NumberFormatException e) { + LOG.info("Number format exception for " + idx, e); + } catch (ArrayIndexOutOfBoundsException e) { + LOG.info("Array out of bounds for " + idx, e); + } + } + } + + @Override + public String toString() { + return name.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + ZNodeName sequence = (ZNodeName) o; + + if (!name.equals(sequence.name)) + return false; + + return true; + } + + @Override + public int hashCode() { + return name.hashCode() + 37; + } + + public int compareTo(ZNodeName that) { + int answer = this.prefix.compareTo(that.prefix); + if (answer == 0) { + int s1 = this.sequence; + int s2 = that.sequence; + if (s1 == -1 && s2 == -1) { + return this.name.compareTo(that.name); + } + answer = s1 == -1 ? 1 : s2 == -1 ? -1 : s1 - s2; + } + return answer; + } + + /** + * Returns the name of the znode + */ + public String getName() { + return name; + } + + /** + * Returns the sequence number + */ + public int getZNodeName() { + return sequence; + } + + /** + * Returns the text prefix before the sequence number + */ + public String getPrefix() { + return prefix; + } +} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/cb305124/helix-core/src/main/java/org/apache/helix/lock/zk/ZooKeeperOperation.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/ZooKeeperOperation.java b/helix-core/src/main/java/org/apache/helix/lock/zk/ZooKeeperOperation.java new file mode 100644 index 0000000..58b9fe3 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/lock/zk/ZooKeeperOperation.java @@ -0,0 +1,38 @@ +package org.apache.helix.lock.zk; + +import org.apache.zookeeper.KeeperException; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * A callback object which can be used for implementing retry-able operations in the + * {@link org.apache.helix.lock.zk.recipes.lock.ProtocolSupport} class + */ +interface ZooKeeperOperation { + + /** + * Performs the operation - which may be involved multiple times if the connection + * to ZooKeeper closes during this operation + * @return the result of the operation or null + * @throws KeeperException + * @throws InterruptedException + */ + public boolean execute() throws KeeperException, InterruptedException; +}
