HADOOP-11612. Workaround for Curator's ChildReaper requiring Guava 15+. 
(rkanter)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6f013303
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6f013303
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6f013303

Branch: refs/heads/HDFS-7285
Commit: 6f0133039a064ca82363ac6f29fb255506f31b8a
Parents: 0d6af57
Author: Robert Kanter <rkan...@apache.org>
Authored: Fri Feb 20 19:47:28 2015 -0800
Committer: Robert Kanter <rkan...@apache.org>
Committed: Fri Feb 20 19:47:28 2015 -0800

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   3 +
 .../apache/hadoop/util/curator/ChildReaper.java | 234 +++++++++++++++++++
 .../hadoop/util/curator/TestChildReaper.java    | 208 +++++++++++++++++
 3 files changed, 445 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f013303/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt 
b/hadoop-common-project/hadoop-common/CHANGES.txt
index b09868a..143692e 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -981,6 +981,9 @@ Release 2.7.0 - UNRELEASED
     HADOOP-11604. Prevent ConcurrentModificationException while closing domain
     sockets during shutdown of DomainSocketWatcher thread. (cnauroth)
 
+    HADOOP-11612. Workaround for Curator's ChildReaper requiring Guava 15+.
+    (rkanter)
+
 Release 2.6.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f013303/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ChildReaper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ChildReaper.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ChildReaper.java
new file mode 100644
index 0000000..3bff187
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ChildReaper.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.recipes.locks.Reaper;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.CloseableScheduledExecutorService;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.curator.utils.PathUtils;
+
+/**
+ * This is a copy of Curator 2.7.1's ChildReaper class, modified to work with
+ * Guava 11.0.2.  The problem is the 'paths' Collection, which calls Guava's
+ * Sets.newConcurrentHashSet(), which was added in Guava 15.0.
+ * <p>
+ * Utility to reap empty child nodes of a parent node. Periodically calls 
getChildren on
+ * the node and adds empty nodes to an internally managed {@link Reaper}
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ChildReaper implements Closeable
+{
+  private final Logger log = LoggerFactory.getLogger(getClass());
+  private final Reaper reaper;
+  private final AtomicReference<State> state = new 
AtomicReference<State>(State.LATENT);
+  private final CuratorFramework client;
+  private final Collection<String> paths = newConcurrentHashSet();
+  private final Reaper.Mode mode;
+  private final CloseableScheduledExecutorService executor;
+  private final int reapingThresholdMs;
+
+  private volatile Future<?> task;
+
+  // This is copied from Curator's Reaper class
+  static final int DEFAULT_REAPING_THRESHOLD_MS = 
(int)TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES);
+
+  // This is copied from Guava
+  /**
+   * Creates a thread-safe set backed by a hash map. The set is backed by a
+   * {@link ConcurrentHashMap} instance, and thus carries the same concurrency
+   * guarantees.
+   *
+   * <p>Unlike {@code HashSet}, this class does NOT allow {@code null} to be
+   * used as an element. The set is serializable.
+   *
+   * @return a new, empty thread-safe {@code Set}
+   * @since 15.0
+   */
+  public static <E> Set<E> newConcurrentHashSet() {
+    return Sets.newSetFromMap(new ConcurrentHashMap<E, Boolean>());
+  }
+
+  private enum State
+  {
+    LATENT,
+    STARTED,
+    CLOSED
+  }
+
+  /**
+   * @param client the client
+   * @param path path to reap children from
+   * @param mode reaping mode
+   */
+  public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode)
+  {
+    this(client, path, mode, newExecutorService(), 
DEFAULT_REAPING_THRESHOLD_MS, null);
+  }
+
+  /**
+   * @param client the client
+   * @param path path to reap children from
+   * @param reapingThresholdMs threshold in milliseconds that determines that 
a path can be deleted
+   * @param mode reaping mode
+   */
+  public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, 
int reapingThresholdMs)
+  {
+    this(client, path, mode, newExecutorService(), reapingThresholdMs, null);
+  }
+
+  /**
+   * @param client the client
+   * @param path path to reap children from
+   * @param executor executor to use for background tasks
+   * @param reapingThresholdMs threshold in milliseconds that determines that 
a path can be deleted
+   * @param mode reaping mode
+   */
+  public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, 
ScheduledExecutorService executor, int reapingThresholdMs)
+  {
+    this(client, path, mode, executor, reapingThresholdMs, null);
+  }
+
+  /**
+   * @param client the client
+   * @param path path to reap children from
+   * @param executor executor to use for background tasks
+   * @param reapingThresholdMs threshold in milliseconds that determines that 
a path can be deleted
+   * @param mode reaping mode
+   * @param leaderPath if not null, uses a leader selection so that only 1 
reaper is active in the cluster
+   */
+  public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, 
ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath)
+  {
+    this.client = client;
+    this.mode = mode;
+    this.executor = new CloseableScheduledExecutorService(executor);
+    this.reapingThresholdMs = reapingThresholdMs;
+    this.reaper = new Reaper(client, executor, reapingThresholdMs, leaderPath);
+    addPath(path);
+  }
+
+  /**
+   * The reaper must be started
+   *
+   * @throws Exception errors
+   */
+  public void start() throws Exception
+  {
+    Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), 
"Cannot be started more than once");
+
+    task = executor.scheduleWithFixedDelay
+        (
+            new Runnable()
+            {
+              @Override
+              public void run()
+              {
+                doWork();
+              }
+            },
+            reapingThresholdMs,
+            reapingThresholdMs,
+            TimeUnit.MILLISECONDS
+        );
+
+    reaper.start();
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+    {
+      CloseableUtils.closeQuietly(reaper);
+      task.cancel(true);
+    }
+  }
+
+  /**
+   * Add a path to reap children from
+   *
+   * @param path the path
+   * @return this for chaining
+   */
+  public ChildReaper addPath(String path)
+  {
+    paths.add(PathUtils.validatePath(path));
+    return this;
+  }
+
+  /**
+   * Remove a path from reaping
+   *
+   * @param path the path
+   * @return true if the path existed and was removed
+   */
+  public boolean removePath(String path)
+  {
+    return paths.remove(PathUtils.validatePath(path));
+  }
+
+  private static ScheduledExecutorService newExecutorService()
+  {
+    return ThreadUtils.newFixedThreadScheduledPool(2, "ChildReaper");
+  }
+
+  private void doWork()
+  {
+    for ( String path : paths )
+    {
+      try
+      {
+        List<String> children = client.getChildren().forPath(path);
+        for ( String name : children )
+        {
+          String thisPath = ZKPaths.makePath(path, name);
+          Stat stat = client.checkExists().forPath(thisPath);
+          if ( (stat != null) && (stat.getNumChildren() == 0) )
+          {
+            reaper.addPath(thisPath, mode);
+          }
+        }
+      }
+      catch ( Exception e )
+      {
+        log.error("Could not get children for path: " + path, e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f013303/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestChildReaper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestChildReaper.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestChildReaper.java
new file mode 100644
index 0000000..11b254f
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestChildReaper.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import org.apache.curator.framework.recipes.locks.Reaper;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.Timing;
+import org.apache.zookeeper.data.Stat;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.BindException;
+import java.util.Random;
+
+/**
+ * This is a copy of Curator 2.7.1's TestChildReaper class, with minor
+ * modifications to make it work with JUnit (some setup code taken from
+ * Curator's BaseClassForTests).  This is to ensure that the ChildReaper
+ * class we modified is still correct.
+ */
+public class TestChildReaper
+{
+  protected TestingServer server;
+
+  @Before
+  public void setup() throws Exception {
+    while(this.server == null) {
+      try {
+        this.server = new TestingServer();
+      } catch (BindException var2) {
+        System.err.println("Getting bind exception - retrying to allocate 
server");
+        this.server = null;
+      }
+    }
+  }
+
+  @After
+  public void teardown() throws Exception {
+    this.server.close();
+    this.server = null;
+  }
+
+  @Test
+  public void     testSomeNodes() throws Exception
+  {
+
+    Timing                  timing = new Timing();
+    ChildReaper             reaper = null;
+    CuratorFramework        client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+    try
+    {
+      client.start();
+
+      Random              r = new Random();
+      int                 nonEmptyNodes = 0;
+      for ( int i = 0; i < 10; ++i )
+      {
+        client.create().creatingParentsIfNeeded().forPath("/test/" + 
Integer.toString(i));
+        if ( r.nextBoolean() )
+        {
+          client.create().forPath("/test/" + Integer.toString(i) + "/foo");
+          ++nonEmptyNodes;
+        }
+      }
+
+      reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, 
1);
+      reaper.start();
+
+      timing.forWaiting().sleepABit();
+
+      Stat    stat = client.checkExists().forPath("/test");
+      Assert.assertEquals(stat.getNumChildren(), nonEmptyNodes);
+    }
+    finally
+    {
+      CloseableUtils.closeQuietly(reaper);
+      CloseableUtils.closeQuietly(client);
+    }
+  }
+
+  @Test
+  public void     testSimple() throws Exception
+  {
+    Timing                  timing = new Timing();
+    ChildReaper             reaper = null;
+    CuratorFramework        client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+    try
+    {
+      client.start();
+
+      for ( int i = 0; i < 10; ++i )
+      {
+        client.create().creatingParentsIfNeeded().forPath("/test/" + 
Integer.toString(i));
+      }
+
+      reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, 
1);
+      reaper.start();
+
+      timing.forWaiting().sleepABit();
+
+      Stat    stat = client.checkExists().forPath("/test");
+      Assert.assertEquals(stat.getNumChildren(), 0);
+    }
+    finally
+    {
+      CloseableUtils.closeQuietly(reaper);
+      CloseableUtils.closeQuietly(client);
+    }
+  }
+
+  @Test
+  public void     testMultiPath() throws Exception
+  {
+    Timing                  timing = new Timing();
+    ChildReaper             reaper = null;
+    CuratorFramework        client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+    try
+    {
+      client.start();
+
+      for ( int i = 0; i < 10; ++i )
+      {
+        client.create().creatingParentsIfNeeded().forPath("/test1/" + 
Integer.toString(i));
+        client.create().creatingParentsIfNeeded().forPath("/test2/" + 
Integer.toString(i));
+        client.create().creatingParentsIfNeeded().forPath("/test3/" + 
Integer.toString(i));
+      }
+
+      reaper = new ChildReaper(client, "/test2", 
Reaper.Mode.REAP_UNTIL_DELETE, 1);
+      reaper.start();
+      reaper.addPath("/test1");
+
+      timing.forWaiting().sleepABit();
+
+      Stat    stat = client.checkExists().forPath("/test1");
+      Assert.assertEquals(stat.getNumChildren(), 0);
+      stat = client.checkExists().forPath("/test2");
+      Assert.assertEquals(stat.getNumChildren(), 0);
+      stat = client.checkExists().forPath("/test3");
+      Assert.assertEquals(stat.getNumChildren(), 10);
+    }
+    finally
+    {
+      CloseableUtils.closeQuietly(reaper);
+      CloseableUtils.closeQuietly(client);
+    }
+  }
+
+  @Test
+  public void     testNamespace() throws Exception
+  {
+    Timing                  timing = new Timing();
+    ChildReaper             reaper = null;
+    CuratorFramework        client = CuratorFrameworkFactory.builder()
+        .connectString(server.getConnectString())
+        .sessionTimeoutMs(timing.session())
+        .connectionTimeoutMs(timing.connection())
+        .retryPolicy(new RetryOneTime(1))
+        .namespace("foo")
+        .build();
+    try
+    {
+      client.start();
+
+      for ( int i = 0; i < 10; ++i )
+      {
+        client.create().creatingParentsIfNeeded().forPath("/test/" + 
Integer.toString(i));
+      }
+
+      reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, 
1);
+      reaper.start();
+
+      timing.forWaiting().sleepABit();
+
+      Stat    stat = client.checkExists().forPath("/test");
+      Assert.assertEquals(stat.getNumChildren(), 0);
+
+      stat = client.usingNamespace(null).checkExists().forPath("/foo/test");
+      Assert.assertNotNull(stat);
+      Assert.assertEquals(stat.getNumChildren(), 0);
+    }
+    finally
+    {
+      CloseableUtils.closeQuietly(reaper);
+      CloseableUtils.closeQuietly(client);
+    }
+  }
+}

Reply via email to