Author: [email protected]
Date: Mon Dec  5 11:46:22 2011
New Revision: 1805

Log:


Added:
   sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/lock/
   
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/lock/Lock.java
   
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/lock/LockListener.java
   
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/lock/ProtocolSupport.java
   
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/lock/WriteLock.java
   
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/lock/ZNodeName.java
   
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/lock/ZooKeeperOperation.java
Modified:
   
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/service/Executor.java
   
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/service/ZooKeeperExecutorImpl.java

Added: 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/lock/Lock.java
==============================================================================
--- (empty file)
+++ 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/lock/Lock.java 
    Mon Dec  5 11:46:22 2011
@@ -0,0 +1,5 @@
+package org.amdatu.zookeeper.lock;
+
+public class Lock {
+
+}

Added: 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/lock/LockListener.java
==============================================================================
--- (empty file)
+++ 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/lock/LockListener.java
     Mon Dec  5 11:46:22 2011
@@ -0,0 +1,44 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.amdatu.zookeeper.lock;
+
+/**
+ * This class has two methods which are call
+ * back methods when a lock is acquired and 
+ * when the lock is released.
+ *
+ */
+public interface LockListener {
+    /**
+     * call back called when the lock 
+     * is acquired
+     */
+    public void lockAcquired();
+    
+    /**
+     * Returns if the lock has been acquired or not.
+     * @return true of the lock has been acquired.
+     */
+    public boolean isAcquired();
+    
+    /**
+     * call back called when the lock is 
+     * released.
+     */
+    public void lockReleased();
+}

Added: 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/lock/ProtocolSupport.java
==============================================================================
--- (empty file)
+++ 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/lock/ProtocolSupport.java
  Mon Dec  5 11:46:22 2011
@@ -0,0 +1,193 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.amdatu.zookeeper.lock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.amdatu.zookeeper.lock.ZooKeeperOperation;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A base class for protocol implementations which provides a number of higher 
+ * level helper methods for working with ZooKeeper along with retrying 
synchronous
+ *  operations if the connection to ZooKeeper closes such as 
+ *  {@link #retryOperation(ZooKeeperOperation)}
+ *
+ */
+class ProtocolSupport {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ProtocolSupport.class);
+
+    protected final ZooKeeper zookeeper;
+    private AtomicBoolean closed = new AtomicBoolean(false);
+    private long retryDelay = 500L;
+    private int retryCount = 10;
+    private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+    public ProtocolSupport(ZooKeeper zookeeper) {
+        this.zookeeper = zookeeper;
+    }
+
+    /**
+     * Closes this strategy and releases any ZooKeeper resources; but keeps the
+     *  ZooKeeper instance open
+     */
+    public void close() {
+        if (closed.compareAndSet(false, true)) {
+            doClose();
+        }
+    }
+    
+    /**
+     * return zookeeper client instance
+     * @return zookeeper client instance
+     */
+    public ZooKeeper getZookeeper() {
+        return zookeeper;
+    }
+
+    /**
+     * return the acl its using
+     * @return the acl.
+     */
+    public List<ACL> getAcl() {
+        return acl;
+    }
+
+    /**
+     * set the acl 
+     * @param acl the acl to set to
+     */
+    public void setAcl(List<ACL> acl) {
+        this.acl = acl;
+    }
+
+    /**
+     * get the retry delay in milliseconds
+     * @return the retry delay
+     */
+    public long getRetryDelay() {
+        return retryDelay;
+    }
+
+    /**
+     * Sets the time waited between retry delays
+     * @param retryDelay the retry delay
+     */
+    public void setRetryDelay(long retryDelay) {
+        this.retryDelay = retryDelay;
+    }
+
+    /**
+     * Allow derived classes to perform 
+     * some custom closing operations to release resources
+     */
+    protected void doClose() {
+    }
+
+
+    /**
+     * Perform the given operation, retrying if the connection fails
+     * @return object. it needs to be cast to the callee's expected 
+     * return type.
+     */
+    protected Object retryOperation(ZooKeeperOperation operation) 
+        throws KeeperException, InterruptedException {
+        KeeperException exception = null;
+        for (int i = 0; i < retryCount; i++) {
+            try {
+                return operation.execute();
+            } catch (KeeperException.SessionExpiredException e) {
+                LOG.warn("Session expired for: " + zookeeper + " so 
reconnecting due to: " + e, e);
+                throw e;
+            } catch (KeeperException.ConnectionLossException e) {
+                if (exception == null) {
+                    exception = e;
+                }
+                LOG.debug("Attempt " + i + " failed with connection loss so " +
+                               "attempting to reconnect: " + e, e);
+                retryDelay(i);
+            }
+        }
+        throw exception;
+    }
+
+    /**
+     * Ensures that the given path exists with no data, the current
+     * ACL and no flags
+     * @param path
+     */
+    protected void ensurePathExists(String path) {
+        ensureExists(path, null, acl, CreateMode.PERSISTENT);
+    }
+
+    /**
+     * Ensures that the given path exists with the given data, ACL and flags
+     * @param path
+     * @param acl
+     * @param flags
+     */
+    protected void ensureExists(final String path, final byte[] data,
+            final List<ACL> acl, final CreateMode flags) {
+        try {
+            retryOperation(new ZooKeeperOperation() {
+                public boolean execute() throws KeeperException, 
InterruptedException {
+                    Stat stat = zookeeper.exists(path, false);
+                    if (stat != null) {
+                        return true;
+                    }
+                    zookeeper.create(path, data, acl, flags);
+                    return true;
+                }
+            });
+        } catch (KeeperException e) {
+            LOG.warn("Caught: " + e, e);
+        } catch (InterruptedException e) {
+            LOG.warn("Caught: " + e, e);
+        }
+    }
+
+    /**
+     * Returns true if this protocol has been closed
+     * @return true if this protocol is closed
+     */
+    protected boolean isClosed() {
+        return closed.get();
+    }
+
+    /**
+     * Performs a retry delay if this is not the first attempt
+     * @param attemptCount the number of the attempts performed so far
+     */
+    protected void retryDelay(int attemptCount) {
+        if (attemptCount > 0) {
+            try {
+                Thread.sleep(attemptCount * retryDelay);
+            } catch (InterruptedException e) {
+                LOG.debug("Failed to sleep: " + e, e);
+            }
+        }
+    }
+}

Added: 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/lock/WriteLock.java
==============================================================================
--- (empty file)
+++ 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/lock/WriteLock.java
        Mon Dec  5 11:46:22 2011
@@ -0,0 +1,329 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.amdatu.zookeeper.lock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * A <a href="package.html">protocol to implement an exclusive
+ *  write lock or to elect a leader</a>. <p/> You invoke {@link #lock()} to 
+ *  start the process of grabbing the lock; you may get the lock then or it 
may be 
+ *  some time later. <p/> You can register a listener so that you are invoked 
+ *  when you get the lock; otherwise you can ask if you have the lock
+ *  by calling {@link #isOwner()}
+ *
+ */
+public class WriteLock extends ProtocolSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(WriteLock.class);
+
+    private final String dir;
+    private String id;
+    private ZNodeName idName;
+    private String ownerId;
+    private String lastChildId;
+    private byte[] data = {0x12, 0x34};
+    private LockListener callback;
+    private LockZooKeeperOperation zop;
+    
+    /**
+     * zookeeper contructor for writelock
+     * @param zookeeper zookeeper client instance
+     * @param dir the parent path you want to use for locking
+     * @param acls the acls that you want to use for all the paths, 
+     * if null world read/write is used.
+     */
+    public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl) {
+        super(zookeeper);
+        this.dir = dir;
+        if (acl != null) {
+            setAcl(acl);
+        }
+        this.zop = new LockZooKeeperOperation();
+    }
+    
+    /**
+     * zookeeper contructor for writelock with callback
+     * @param zookeeper the zookeeper client instance
+     * @param dir the parent path you want to use for locking
+     * @param acl the acls that you want to use for all the paths
+     * @param callback the call back instance
+     */
+    public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl, 
+            LockListener callback) {
+        this(zookeeper, dir, acl);
+        this.callback = callback;
+    }
+
+    /**
+     * return the current locklistener
+     * @return the locklistener
+     */
+    public LockListener getLockListener() {
+        return this.callback;
+    }
+    
+    /**
+     * register a different call back listener
+     * @param callback the call back instance
+     */
+    public void setLockListener(LockListener callback) {
+        this.callback = callback;
+    }
+
+    /**
+     * Removes the lock or associated znode if 
+     * you no longer require the lock. this also 
+     * removes your request in the queue for locking
+     * in case you do not already hold the lock.
+     * @throws RuntimeException throws a runtime exception
+     * if it cannot connect to zookeeper.
+     */
+    public synchronized void unlock() throws RuntimeException {
+        
+        if (!isClosed() && id != null) {
+            // we don't need to retry this operation in the case of failure
+            // as ZK will remove ephemeral files and we don't wanna hang
+            // this process when closing if we cannot reconnect to ZK
+            try {
+                
+                ZooKeeperOperation zopdel = new ZooKeeperOperation() {
+                    public boolean execute() throws KeeperException,
+                        InterruptedException {
+                        zookeeper.delete(id, -1);   
+                        return Boolean.TRUE;
+                    }
+                };
+                zopdel.execute();
+            } catch (InterruptedException e) {
+                LOG.warn("Caught: " + e, e);
+                //set that we have been interrupted.
+               Thread.currentThread().interrupt();
+            } catch (KeeperException.NoNodeException e) {
+                // do nothing
+            } catch (KeeperException e) {
+                LOG.warn("Caught: " + e, e);
+                throw (RuntimeException) new RuntimeException(e.getMessage()).
+                    initCause(e);
+            }
+            finally {
+                if (callback != null) {
+                    callback.lockReleased();
+                }
+                id = null;
+            }
+        }
+    }
+    
+    /** 
+     * the watcher called on  
+     * getting watch while watching 
+     * my predecessor
+     */
+    private class LockWatcher implements Watcher {
+        public void process(WatchedEvent event) {
+            // lets either become the leader or watch the new/updated node
+            LOG.debug("Watcher fired on path: " + event.getPath() + " state: " 
+ 
+                    event.getState() + " type " + event.getType());
+            try {
+                lock();
+            } catch (Exception e) {
+                LOG.warn("Failed to acquire lock: " + e, e);
+            }
+        }
+    }
+    
+    /**
+     * a zoookeeper operation that is mainly responsible
+     * for all the magic required for locking.
+     */
+    private  class LockZooKeeperOperation implements ZooKeeperOperation {
+        
+        /** find if we have been created earler if not create our node
+         * 
+         * @param prefix the prefix node
+         * @param zookeeper teh zookeeper client
+         * @param dir the dir paretn
+         * @throws KeeperException
+         * @throws InterruptedException
+         */
+        private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, 
String dir) 
+            throws KeeperException, InterruptedException {
+            List<String> names = zookeeper.getChildren(dir, false);
+            for (String name : names) {
+                if (name.startsWith(prefix)) {
+                    id = name;
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Found id created last time: " + id);
+                    }
+                    break;
+                }
+            }
+            if (id == null) {
+                id = zookeeper.create(dir + "/" + prefix, data, 
+                        getAcl(), EPHEMERAL_SEQUENTIAL);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Created id: " + id);
+                }
+            }
+
+        }
+        
+        /**
+         * the command that is run and retried for actually 
+         * obtaining the lock
+         * @return if the command was successful or not
+         */
+        public boolean execute() throws KeeperException, InterruptedException {
+            do {
+                if (id == null) {
+                    long sessionId = zookeeper.getSessionId();
+                    String prefix = "x-" + sessionId + "-";
+                    // lets try look up the current ID if we failed 
+                    // in the middle of creating the znode
+                    findPrefixInChildren(prefix, zookeeper, dir);
+                    idName = new ZNodeName(id);
+                }
+                if (id != null) {
+                    List<String> names = zookeeper.getChildren(dir, false);
+                    if (names.isEmpty()) {
+                        LOG.warn("No children in: " + dir + " when we've just 
" +
+                        "created one! Lets recreate it...");
+                        // lets force the recreation of the id
+                        id = null;
+                    } else {
+                        // lets sort them explicitly (though they do seem to 
come back in order ususally :)
+                        SortedSet<ZNodeName> sortedNames = new 
TreeSet<ZNodeName>();
+                        for (String name : names) {
+                            sortedNames.add(new ZNodeName(dir + "/" + name));
+                        }
+                        ownerId = sortedNames.first().getName();
+                        SortedSet<ZNodeName> lessThanMe = 
sortedNames.headSet(idName);
+                        if (!lessThanMe.isEmpty()) {
+                            ZNodeName lastChildName = lessThanMe.last();
+                            lastChildId = lastChildName.getName();
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("watching less than me node: " + 
lastChildId);
+                            }
+                            Stat stat = zookeeper.exists(lastChildId, new 
LockWatcher());
+                            if (stat != null) {
+                                return Boolean.FALSE;
+                            } else {
+                                LOG.warn("Could not find the" +
+                                               " stats for less than me: " + 
lastChildName.getName());
+                            }
+                        } else {
+                            if (isOwner()) {
+                                if (callback != null) {
+                                    callback.lockAcquired();
+                                }
+                                return Boolean.TRUE;
+                            }
+                        }
+                    }
+                }
+            }
+            while (id == null);
+            return Boolean.FALSE;
+        }
+    };
+
+    /**
+     * Attempts to acquire the exclusive write lock returning whether or not 
it was
+     * acquired. Note that the exclusive lock may be acquired some time later 
after
+     * this method has been invoked due to the current lock owner going away.
+     */
+    public synchronized boolean lock() throws KeeperException, 
InterruptedException {
+        if (isClosed()) {
+            return false;
+        }
+        ensurePathExists(dir);
+
+        return (Boolean) retryOperation(zop);
+    }
+    
+    public boolean acquireLock(long timeout) throws KeeperException, 
InterruptedException {
+        LockListener listener = new LockListener() {
+            private boolean acquired = false;
+            
+            public void lockAcquired() {
+                acquired = true;
+            }
+
+            public void lockReleased() {
+            }
+            
+            public boolean isAcquired() {
+                return acquired;
+            }
+        };
+        setLockListener(listener);
+        lock();
+        
+        // Wait until either the lock is acquired, or the timeout has expired
+        long expireTime = System.currentTimeMillis() + timeout;        
+        while (!listener.isAcquired() && System.currentTimeMillis() < 
expireTime) {
+            // Wait for 500 milliseconds
+            Thread.sleep(500);
+        }
+        
+        // Sleep for timeout seconds or the thread being awaked by obtaining 
the lock
+        //Thread.sleep(timeout);
+        
+        return listener.isAcquired();
+    }
+    
+    
+
+    /**
+     * return the parent dir for lock
+     * @return the parent dir used for locks.
+     */
+    public String getDir() {
+        return dir;
+    }
+
+    /**
+     * Returns true if this node is the owner of the
+     *  lock (or the leader)
+     */
+    public boolean isOwner() {
+        return id != null && ownerId != null && id.equals(ownerId);
+    }
+
+    /**
+     * return the id for this lock
+     * @return the id for this lock
+     */
+    public String getId() {
+       return this.id;
+    }
+}
+

Added: 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/lock/ZNodeName.java
==============================================================================
--- (empty file)
+++ 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/lock/ZNodeName.java
        Mon Dec  5 11:46:22 2011
@@ -0,0 +1,110 @@
+/**
+ *
+ladage.tweakdsl * Licensed to the Apache Software Foundation (ASF) under one 
or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.amdatu.zookeeper.lock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents an ephemeral znode name which has an ordered sequence number
+ * and can be sorted in order
+ *
+ */
+class ZNodeName implements Comparable<ZNodeName> {
+    private final String name;
+    private String prefix;
+    private int sequence = -1;
+    private static final Logger LOG = LoggerFactory.getLogger(ZNodeName.class);
+    
+    public ZNodeName(String name) {
+        if (name == null) {
+            throw new NullPointerException("id cannot be null");
+        }
+        this.name = name;
+        this.prefix = name;
+        int idx = name.lastIndexOf('-');
+        if (idx >= 0) {
+            this.prefix = name.substring(0, idx);
+            try {
+                this.sequence = Integer.parseInt(name.substring(idx + 1));
+                // If an exception occurred we misdetected a sequence suffix,
+                // so return -1.
+            } catch (NumberFormatException e) {
+                LOG.info("Number format exception for " + idx, e);
+            } catch (ArrayIndexOutOfBoundsException e) {
+               LOG.info("Array out of bounds for " + idx, e);
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return name.toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ZNodeName sequence = (ZNodeName) o;
+
+        if (!name.equals(sequence.name)) return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        return name.hashCode() + 37;
+    }
+
+    public int compareTo(ZNodeName that) {
+        int answer = this.prefix.compareTo(that.prefix);
+        if (answer == 0) {
+            int s1 = this.sequence;
+            int s2 = that.sequence;
+            if (s1 == -1 && s2 == -1) {
+                return this.name.compareTo(that.name);
+            }
+            answer = s1 == -1 ? 1 : s2 == -1 ? -1 : s1 - s2;
+        }
+        return answer;
+    }
+
+    /**
+     * Returns the name of the znode
+     */
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * Returns the sequence number
+     */
+    public int getZNodeName() {
+        return sequence;
+    }
+
+    /**
+     * Returns the text prefix before the sequence number
+     */
+    public String getPrefix() {
+        return prefix;
+    }
+}

Added: 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/lock/ZooKeeperOperation.java
==============================================================================
--- (empty file)
+++ 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/lock/ZooKeeperOperation.java
       Mon Dec  5 11:46:22 2011
@@ -0,0 +1,38 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.amdatu.zookeeper.lock;
+
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * A callback object which can be used for implementing retry-able operations 
in the 
+ * {@link org.apache.zookeeper.recipes.lock.ProtocolSupport} class
+ *
+ */
+public interface ZooKeeperOperation {
+    
+    /**
+     * Performs the operation - which may be involved multiple times if the 
connection
+     * to ZooKeeper closes during this operation
+     *
+     * @return the result of the operation or null
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    public boolean execute() throws KeeperException, InterruptedException;
+}

Modified: 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/service/Executor.java
==============================================================================
--- 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/service/Executor.java
      (original)
+++ 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/service/Executor.java
      Mon Dec  5 11:46:22 2011
@@ -14,7 +14,7 @@
 {
     String znode;
 
-    DataMonitor dm;
+    //DataMonitor dm;
 
     ZooKeeper zk;
 
@@ -29,8 +29,8 @@
         String exec[]) throws KeeperException, IOException {
         this.filename = filename;
         this.exec = exec;
-        zk = new ZooKeeper(hostPort, 3000, this);
-        dm = new DataMonitor(zk, znode, null, this);
+        zk = new ZooKeeper(hostPort, 10000, this);
+        //dm = new DataMonitor(zk, znode, null, this);
     }
 
     public ZooKeeper getClient() {
@@ -68,13 +68,13 @@
      * @see 
org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent)
      */
     public void process(WatchedEvent event) {
-        dm.process(event);
+       // dm.process(event);
     }
 
     public void run() {
         try {
             synchronized (this) {
-                while (!dm.dead && !m_interrupt) {
+                while (!m_interrupt) {
                     wait();
                 }
             }

Modified: 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/service/ZooKeeperExecutorImpl.java
==============================================================================
--- 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/service/ZooKeeperExecutorImpl.java
 (original)
+++ 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/service/ZooKeeperExecutorImpl.java
 Mon Dec  5 11:46:22 2011
@@ -1,42 +1,140 @@
 package org.amdatu.zookeeper.service;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import org.amdatu.zookeeper.lock.WriteLock;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 
+/**
+ * We could use ZooKeeper for distributed locks (and more). It would be useful 
for Cassandra, 
+ * in which we need to acquire a shared lock before we update the schema (i.e. 
add keyspace ,add CF,
+ * update CF, etc.). The disadvantage however is that in ZooKeeper, each 
server must known about 
+ * each other ZooKeeper server. ZooKeeper clients also connect to 1 or more 
servers, the more the 
+ * better. So this would cause issues with an elastic cluster; when a new node 
is added also the 
+ * configuration of each existing node must be updated with a new ZooKeeper 
server configuration.
+ * If the cassandra service would depend on the ZooKeeper service (for shared 
locks), the cassandra 
+ * service would be down for a moment, while ZooKeeper is restarting.
+ * 
+ * TODO: we should expose a ZooKeeperClient service which can be used to 
communicate with the client.
+ * Also WriteLock should be added to the API, which can be used to obtain 
shared locks.
+ * Without exposing its API, a ZooKeeperServer service should be added (exists 
already).
+ * @author ivol
+ */
+public class ZooKeeperExecutorImpl {
 
-public class ZooKeeperExecutorImpl{
- 
     private Executor executor;
-    
+    private ZooKeeper client;
+
     public void start() {
-       String[] args = new String[4];
-       args[0] = "localhost:2181";
-       args[1] = "/"; // path to monitor, '/' monitors all
-       args[2] = "work/zookeeper/executor"; // filename to write changes to
-       args[3] = "cmd.exe"; // process to be executed when a child is added. 
This is just an example
-       
-       // TODO: Add a distributed lock service, which we will be using for 
cassandra
-       // in cassandra we will set a lock to /cassandra/keyspace1/schema
-       // see http://zookeeper.apache.org/doc/trunk/recipes.html#Shared+Locks. 
-       executor = Executor.main(args);
-       
-       ZooKeeper client = getClient();
-       try {
-        byte[] result = client.getData("/zk_test", false, new Stat());
-        String g = new String(result);
-        int gss=0;
-    }
-    catch (KeeperException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
+        String[] args = new String[4];
+        args[0] = "127.0.0.1:2181";
+        args[1] = "/"; // path to monitor, '/' monitors all
+        args[2] = "work/zookeeper/executor"; // filename to write changes to
+        args[3] = "cmd.exe"; // process to be executed when a child is added. 
This is just an example
+
+        // TODO: Add a distributed lock service, which we will be using for 
cassandra
+        // in cassandra we will set a lock to /cassandra/keyspace1/schema
+        // see http://zookeeper.apache.org/doc/trunk/recipes.html#Shared+Locks.
+
+        // CLi example:
+        // create /cassandra 1
+        // create /cassandra/keyspace1 1
+
+        executor = Executor.main(args);
+
+        test();
     }
-    catch (InterruptedException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
+
+    private void test() {
+        try {
+            client = getClient();
+            try {
+                System.out.println("State: " + client.getState());
+                Stat stat = client.exists("/cassandra", false);
+                if (stat == null) {
+                    client.create("/cassandra", "1".getBytes(), 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                }
+                
+                stat = client.exists("/cassandra/keyspace1", false);
+                if (stat == null) {
+                    client.create("/cassandra/keyspace1", "1".getBytes(), 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                }
+            }
+            catch (KeeperException e1) {
+                // TODO Auto-generated catch block
+                e1.printStackTrace();
+            }
+            catch (InterruptedException e1) {
+                // TODO Auto-generated catch block
+                e1.printStackTrace();
+            }
+
+            if (client.getState() == ZooKeeper.States.CONNECTED) {
+
+                List<ZooThread> threads = new ArrayList<ZooThread>();
+                for (int i = 1; i <= 5; i++) {
+                    ZooThread thread = new ZooThread();
+                    threads.add(thread);
+                    thread.start();
+                }
+                for (ZooThread thread : threads) {
+                    try {
+                        thread.join();
+                    }
+                    catch (InterruptedException e) {
+                        // TODO Auto-generated catch block
+                        e.printStackTrace();
+                    }
+                }
+            }
+        }
+        finally {
+            try {
+                if (client != null) {
+                    client.close();
+                }
+            }
+            catch (InterruptedException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+        }
     }
+
+    class ZooThread extends Thread {
+        private Exception m_error;
+
+        public ZooThread() {
+        }
+
+        public void run() {
+            try {
+                System.out.println("Thread " + Thread.currentThread().getId() 
+ " is acquiring write lock.");
+                WriteLock writeLock = new WriteLock(client, 
"/cassandra/keyspace1/schema", null);
+                boolean acquired = writeLock.acquireLock(30000);
+                if (acquired) {
+                    System.out.println("Thread " + 
Thread.currentThread().getId() + " has acquired the write lock.");
+                    System.out.println("Thread " + 
Thread.currentThread().getId()
+                        + " is holding the lock for 5 seconds.");
+                    Thread.sleep(5000);
+                }
+                else {
+                    System.out.println("Thread " + 
Thread.currentThread().getId() + " did not acquire write lock.");
+                }
+                writeLock.unlock();
+                System.out.println("Thread " + Thread.currentThread().getId() 
+ " released the write lock.");
+            }
+            catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
     }
-    
+
     public ZooKeeper getClient() {
         return executor.getClient();
     }
@@ -44,7 +142,5 @@
     public void stop() {
         executor.interrupt();
     }
- 
-    
-}
 
+}
_______________________________________________
Amdatu-commits mailing list
[email protected]
http://lists.amdatu.org/mailman/listinfo/amdatu-commits

Reply via email to