Repository: hadoop
Updated Branches:
  refs/heads/trunk 6d2da38d1 -> 3fbf4cd5d


HADOOP-13716. Add LambdaTestUtils class for tests; fix eventual consistency 
problem in contract test setup. Contributed by Steve Loughran.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3fbf4cd5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3fbf4cd5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3fbf4cd5

Branch: refs/heads/trunk
Commit: 3fbf4cd5da13dde68b77e581ea2d4aa564c8c8b7
Parents: 6d2da38
Author: Anu Engineer <aengin...@apache.org>
Authored: Thu Oct 20 12:33:58 2016 -0700
Committer: Anu Engineer <aengin...@apache.org>
Committed: Thu Oct 20 12:33:58 2016 -0700

----------------------------------------------------------------------
 .../AbstractContractRootDirectoryTest.java      |  48 +-
 .../hadoop/fs/contract/ContractTestUtils.java   |   6 +-
 .../org/apache/hadoop/test/LambdaTestUtils.java | 521 +++++++++++++++++++
 .../apache/hadoop/test/TestLambdaTestUtils.java | 395 ++++++++++++++
 .../hadoop/fs/s3a/ITestS3AFailureHandling.java  |  20 +-
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java  |  48 --
 6 files changed, 962 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fbf4cd5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java
index 0a8f464..5fba4bf 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java
@@ -27,12 +27,16 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.test.LambdaTestUtils;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.deleteChildren;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dumpStats;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.listChildren;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.toList;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk;
@@ -45,6 +49,7 @@ import static 
org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk;
 public abstract class AbstractContractRootDirectoryTest extends 
AbstractFSContractTestBase {
   private static final Logger LOG =
       LoggerFactory.getLogger(AbstractContractRootDirectoryTest.class);
+  public static final int OBJECTSTORE_RETRY_TIMEOUT = 30000;
 
   @Override
   public void setup() throws Exception {
@@ -79,23 +84,34 @@ public abstract class AbstractContractRootDirectoryTest 
extends AbstractFSContra
     // extra sanity checks here to avoid support calls about complete loss
     // of data
     skipIfUnsupported(TEST_ROOT_TESTS_ENABLED);
-    Path root = new Path("/");
+    final Path root = new Path("/");
     assertIsDirectory(root);
-    // make sure it is clean
-    FileSystem fs = getFileSystem();
-    deleteChildren(fs, root, true);
-    FileStatus[] children = listChildren(fs, root);
-    if (children.length > 0) {
-      StringBuilder error = new StringBuilder();
-      error.append("Deletion of child entries failed, still have")
-          .append(children.length)
-          .append(System.lineSeparator());
-      for (FileStatus child : children) {
-        error.append("  ").append(child.getPath())
-            .append(System.lineSeparator());
-      }
-      fail(error.toString());
-    }
+    // make sure the directory is clean. This includes some retry logic
+    // to forgive blobstores whose listings can be out of sync with the file
+    // status;
+    final FileSystem fs = getFileSystem();
+    final AtomicInteger iterations = new AtomicInteger(0);
+    final FileStatus[] originalChildren = listChildren(fs, root);
+    LambdaTestUtils.eventually(
+        OBJECTSTORE_RETRY_TIMEOUT,
+        new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            FileStatus[] deleted = deleteChildren(fs, root, true);
+            FileStatus[] children = listChildren(fs, root);
+            if (children.length > 0) {
+              fail(String.format(
+                  "After %d attempts: listing after rm /* not empty"
+                      + "\n%s\n%s\n%s",
+                  iterations.incrementAndGet(),
+                  dumpStats("final", children),
+                  dumpStats("deleted", deleted),
+                  dumpStats("original", originalChildren)));
+            }
+            return null;
+          }
+        },
+        new LambdaTestUtils.ProportionalRetryInterval(50, 1000));
     // then try to delete the empty one
     boolean deleted = fs.delete(root, false);
     LOG.info("rm / of empty dir result is {}", deleted);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fbf4cd5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
index 16bfb9a..73c8f1c 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
@@ -400,18 +400,18 @@ public class ContractTestUtils extends Assert {
    * @param fileSystem filesystem
    * @param path path to delete
    * @param recursive flag to indicate child entry deletion should be recursive
-   * @return the number of child entries found and deleted (not including
+   * @return the immediate child entries found and deleted (not including
    * any recursive children of those entries)
    * @throws IOException problem in the deletion process.
    */
-  public static int deleteChildren(FileSystem fileSystem,
+  public static FileStatus[] deleteChildren(FileSystem fileSystem,
       Path path,
       boolean recursive) throws IOException {
     FileStatus[] children = listChildren(fileSystem, path);
     for (FileStatus entry : children) {
       fileSystem.delete(entry.getPath(), recursive);
     }
-    return children.length;
+    return children;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fbf4cd5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
new file mode 100644
index 0000000..1fa5c3f
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.test;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.util.Time;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Class containing methods and associated classes to make the most of Lambda
+ * expressions in Hadoop tests.
+ *
+ * The code has been designed from the outset to be Java-8 friendly, but
+ * to still be usable in Java 7.
+ *
+ * The code is modelled on {@code GenericTestUtils#waitFor(Supplier, int, 
int)},
+ * but also lifts concepts from Scalatest's {@code awaitResult} and
+ * its notion of pluggable retry logic (simple, backoff, maybe even things
+ * with jitter: test author gets to choose).
+ * The {@link #intercept(Class, Callable)} method is also all credit due
+ * Scalatest, though it's been extended to also support a string message
+ * check; useful when checking the contents of the exception.
+ */
+public final class LambdaTestUtils {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(LambdaTestUtils.class);
+
+  private LambdaTestUtils() {
+  }
+
+  /**
+   * This is the string included in the assertion text in
+   * {@link #intercept(Class, Callable)} if
+   * the closure returned a null value.
+   */
+  public static final String NULL_RESULT = "(null)";
+
+  /**
+   * Interface to implement for converting a timeout into some form
+   * of exception to raise.
+   */
+  public interface TimeoutHandler {
+
+    /**
+     * Create an exception (or throw one, if desired).
+     * @param timeoutMillis timeout which has arisen
+     * @param caught any exception which was caught; may be null
+     * @return an exception which will then be thrown
+     * @throws Exception if the handler wishes to raise an exception
+     * that way.
+     */
+    Exception evaluate(int timeoutMillis, Exception caught) throws Exception;
+  }
+
+  /**
+   * Wait for a condition to be met, with a retry policy returning the
+   * sleep time before the next attempt is made. If, at the end
+   * of the timeout period, the condition is still false (or failing with
+   * an exception), the timeout handler is invoked, passing in the timeout
+   * and any exception raised in the last invocation. The exception returned
+   * by this timeout handler is then rethrown.
+   * <p>
+   * Example: Wait 30s for a condition to be met, with a sleep of 30s
+   * between each probe.
+   * If the operation is failing, then, after 30s, the timeout handler
+   * is called. This returns the exception passed in (if any),
+   * or generates a new one.
+   * <pre>
+   * await(
+   *   30 * 1000,
+   *   () -> { return 0 == filesystem.listFiles(new Path("/")).length); },
+   *   () -> 500),
+   *   (timeout, ex) -> ex != null ? ex : new TimeoutException("timeout"));
+   * </pre>
+   *
+   * @param timeoutMillis timeout in milliseconds.
+   * Can be zero, in which case only one attempt is made.
+   * @param check predicate to evaluate
+   * @param retry retry escalation logic
+   * @param timeoutHandler handler invoked on timeout;
+   * the returned exception will be thrown
+   * @return the number of iterations before the condition was satisfied
+   * @throws Exception the exception returned by {@code timeoutHandler} on
+   * timeout
+   * @throws FailFastException immediately if the evaluated operation raises it
+   * @throws InterruptedException if interrupted.
+   */
+  public static int await(int timeoutMillis,
+      Callable<Boolean> check,
+      Callable<Integer> retry,
+      TimeoutHandler timeoutHandler)
+      throws Exception {
+    Preconditions.checkArgument(timeoutMillis >= 0,
+        "timeoutMillis must be >= 0");
+    Preconditions.checkNotNull(timeoutHandler);
+
+    long endTime = Time.now() + timeoutMillis;
+    Exception ex = null;
+    boolean running = true;
+    int iterations = 0;
+    while (running) {
+      iterations++;
+      try {
+        if (check.call()) {
+          return iterations;
+        }
+        // the probe failed but did not raise an exception. Reset any
+        // exception raised by a previous probe failure.
+        ex = null;
+      } catch (InterruptedException | FailFastException e) {
+        throw e;
+      } catch (Exception e) {
+        LOG.debug("eventually() iteration {}", iterations, e);
+        ex = e;
+      }
+      running = Time.now() < endTime;
+      if (running) {
+        int sleeptime = retry.call();
+        if (sleeptime >= 0) {
+          Thread.sleep(sleeptime);
+        } else {
+          running = false;
+        }
+      }
+    }
+    // timeout
+    Exception evaluate = timeoutHandler.evaluate(timeoutMillis, ex);
+    if (evaluate == null) {
+      // bad timeout handler logic; fall back to GenerateTimeout so the
+      // underlying problem isn't lost.
+      LOG.error("timeout handler {} did not throw an exception ",
+          timeoutHandler);
+      evaluate = new GenerateTimeout().evaluate(timeoutMillis, ex);
+    }
+    throw evaluate;
+  }
+
+  /**
+   * Simplified {@link #await(int, Callable, Callable, TimeoutHandler)}
+   * operation with a fixed interval
+   * and {@link GenerateTimeout} handler to generate a {@code 
TimeoutException}.
+   * <p>
+   * Example: await for probe to succeed:
+   * <pre>
+   * await(
+   *   30 * 1000, 500,
+   *   () -> { return 0 == filesystem.listFiles(new Path("/")).length); });
+   * </pre>
+   *
+   * @param timeoutMillis timeout in milliseconds.
+   * Can be zero, in which case only one attempt is made.
+   * @param intervalMillis interval in milliseconds between checks
+   * @param check predicate to evaluate
+   * @return the number of iterations before the condition was satisfied
+   * @throws Exception returned by {@code failure} on timeout
+   * @throws FailFastException immediately if the evaluated operation raises it
+   * @throws InterruptedException if interrupted.
+   */
+  public static int await(int timeoutMillis,
+      int intervalMillis,
+      Callable<Boolean> check) throws Exception {
+    return await(timeoutMillis, check,
+        new FixedRetryInterval(intervalMillis),
+        new GenerateTimeout());
+  }
+
+  /**
+   * Repeatedly execute a closure until it returns a value rather than
+   * raise an exception.
+   * Exceptions are caught and, with one exception,
+   * trigger a sleep and retry. This is similar of ScalaTest's
+   * {@code eventually(timeout, closure)} operation, though that lacks
+   * the ability to fail fast if the inner closure has determined that
+   * a failure condition is non-recoverable.
+   * <p>
+   * Example: spin until an the number of files in a filesystem is non-zero,
+   * returning the files found.
+   * The sleep interval backs off by 500 ms each iteration to a maximum of 5s.
+   * <pre>
+   * FileStatus[] files = eventually( 30 * 1000,
+   *   () -> {
+   *     FileStatus[] f = filesystem.listFiles(new Path("/"));
+   *     assertEquals(0, f.length);
+   *     return f;
+   *   },
+   *   new ProportionalRetryInterval(500, 5000));
+   * </pre>
+   * This allows for a fast exit, yet reduces probe frequency over time.
+   *
+   * @param <T> return type
+   * @param timeoutMillis timeout in milliseconds.
+   * Can be zero, in which case only one attempt is made before failing.
+   * @param eval expression to evaluate
+   * @param retry retry interval generator
+   * @return result of the first successful eval call
+   * @throws Exception the last exception thrown before timeout was triggered
+   * @throws FailFastException if raised -without any retry attempt.
+   * @throws InterruptedException if interrupted during the sleep operation.
+   */
+  public static <T> T eventually(int timeoutMillis,
+      Callable<T> eval,
+      Callable<Integer> retry) throws Exception {
+    Preconditions.checkArgument(timeoutMillis >= 0,
+        "timeoutMillis must be >= 0");
+    long endTime = Time.now() + timeoutMillis;
+    Exception ex;
+    boolean running;
+    int sleeptime;
+    int iterations = 0;
+    do {
+      iterations++;
+      try {
+        return eval.call();
+      } catch (InterruptedException | FailFastException e) {
+        // these two exceptions trigger an immediate exit
+        throw e;
+      } catch (Exception e) {
+        LOG.debug("evaluate() iteration {}", iterations, e);
+        ex = e;
+      }
+      running = Time.now() < endTime;
+      if (running && (sleeptime = retry.call()) >= 0) {
+        Thread.sleep(sleeptime);
+      }
+    } while (running);
+    // timeout. Throw the last exception raised
+    throw ex;
+  }
+
+  /**
+   * Simplified {@link #eventually(int, Callable, Callable)} method
+   * with a fixed interval.
+   * <p>
+   * Example: wait 30s until an assertion holds, sleeping 1s between each
+   * check.
+   * <pre>
+   * eventually( 30 * 1000, 1000,
+   *   () -> { assertEquals(0, filesystem.listFiles(new Path("/")).length); }
+   * );
+   * </pre>
+   *
+   * @param timeoutMillis timeout in milliseconds.
+   * Can be zero, in which case only one attempt is made before failing.
+   * @param intervalMillis interval in milliseconds
+   * @param eval expression to evaluate
+   * @return result of the first successful invocation of {@code eval()}
+   * @throws Exception the last exception thrown before timeout was triggered
+   * @throws FailFastException if raised -without any retry attempt.
+   * @throws InterruptedException if interrupted during the sleep operation.
+   */
+  public static <T> T eventually(int timeoutMillis,
+      int intervalMillis,
+      Callable<T> eval) throws Exception {
+    return eventually(timeoutMillis, eval,
+        new FixedRetryInterval(intervalMillis));
+  }
+
+  /**
+   * Intercept an exception; throw an {@code AssertionError} if one not raised.
+   * The caught exception is rethrown if it is of the wrong class or
+   * does not contain the text defined in {@code contained}.
+   * <p>
+   * Example: expect deleting a nonexistent file to raise a
+   * {@code FileNotFoundException}.
+   * <pre>
+   * FileNotFoundException ioe = intercept(FileNotFoundException.class,
+   *   () -> {
+   *     filesystem.delete(new Path("/missing"), false);
+   *   });
+   * </pre>
+   *
+   * @param clazz class of exception; the raised exception must be this class
+   * <i>or a subclass</i>.
+   * @param eval expression to eval
+   * @param <T> return type of expression
+   * @param <E> exception class
+   * @return the caught exception if it was of the expected type
+   * @throws Exception any other exception raised
+   * @throws AssertionError if the evaluation call didn't raise an exception.
+   * The error includes the {@code toString()} value of the result, if this
+   * can be determined.
+   */
+  @SuppressWarnings("unchecked")
+  public static <T, E extends Throwable> E intercept(
+      Class<E> clazz,
+      Callable<T> eval)
+      throws Exception {
+    try {
+      T result = eval.call();
+      throw new AssertionError("Expected an exception, got "
+          + robustToString(result));
+    } catch (Throwable e) {
+      if (clazz.isAssignableFrom(e.getClass())) {
+        return (E)e;
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * Intercept an exception; throw an {@code AssertionError} if one not raised.
+   * The caught exception is rethrown if it is of the wrong class or
+   * does not contain the text defined in {@code contained}.
+   * <p>
+   * Example: expect deleting a nonexistent file to raise a
+   * {@code FileNotFoundException} with the {@code toString()} value
+   * containing the text {@code "missing"}.
+   * <pre>
+   * FileNotFoundException ioe = intercept(FileNotFoundException.class,
+   *   "missing",
+   *   () -> {
+   *     filesystem.delete(new Path("/missing"), false);
+   *   });
+   * </pre>
+   *
+   * @param clazz class of exception; the raised exception must be this class
+   * <i>or a subclass</i>.
+   * @param contained string which must be in the {@code toString()} value
+   * of the exception
+   * @param eval expression to eval
+   * @param <T> return type of expression
+   * @param <E> exception class
+   * @return the caught exception if it was of the expected type and contents
+   * @throws Exception any other exception raised
+   * @throws AssertionError if the evaluation call didn't raise an exception.
+   * The error includes the {@code toString()} value of the result, if this
+   * can be determined.
+   * @see GenericTestUtils#assertExceptionContains(String, Throwable)
+   */
+  public static <T, E extends Throwable> E intercept(
+      Class<E> clazz,
+      String contained,
+      Callable<T> eval)
+      throws Exception {
+    E ex = intercept(clazz, eval);
+    GenericTestUtils.assertExceptionContains(contained, ex);
+    return ex;
+  }
+
+  /**
+   * Robust string converter for exception messages; if the {@code toString()}
+   * method throws an exception then that exception is caught and logged,
+   * then a simple string of the classname logged.
+   * This stops a {@code toString()} failure hiding underlying problems.
+   * @param o object to stringify
+   * @return a string for exception messages
+   */
+  private static String robustToString(Object o) {
+    if (o == null) {
+      return NULL_RESULT;
+    } else {
+      try {
+        return o.toString();
+      } catch (Exception e) {
+        LOG.info("Exception calling toString()", e);
+        return o.getClass().toString();
+      }
+    }
+  }
+
+  /**
+   * Returns {@code TimeoutException} on a timeout. If
+   * there was a inner class passed in, includes it as the
+   * inner failure.
+   */
+  public static class GenerateTimeout implements TimeoutHandler {
+    private final String message;
+
+    public GenerateTimeout(String message) {
+      this.message = message;
+    }
+
+    public GenerateTimeout() {
+      this("timeout");
+    }
+
+    /**
+     * Evaluate operation creates a new {@code TimeoutException}.
+     * @param timeoutMillis timeout in millis
+     * @param caught optional caught exception
+     * @return TimeoutException
+     */
+    @Override
+    public Exception evaluate(int timeoutMillis, Exception caught)
+        throws Exception {
+      String s = String.format("%s: after %d millis", message,
+          timeoutMillis);
+      String caughtText = caught != null
+          ? ("; " + robustToString(caught)) : "";
+
+      return (TimeoutException) (new TimeoutException(s + caughtText)
+                                     .initCause(caught));
+    }
+  }
+
+  /**
+   * Retry at a fixed time period between calls.
+   */
+  public static class FixedRetryInterval implements Callable<Integer> {
+    private final int intervalMillis;
+    private int invocationCount = 0;
+
+    public FixedRetryInterval(int intervalMillis) {
+      Preconditions.checkArgument(intervalMillis > 0);
+      this.intervalMillis = intervalMillis;
+    }
+
+    @Override
+    public Integer call() throws Exception {
+      invocationCount++;
+      return intervalMillis;
+    }
+
+    public int getInvocationCount() {
+      return invocationCount;
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder(
+          "FixedRetryInterval{");
+      sb.append("interval=").append(intervalMillis);
+      sb.append(", invocationCount=").append(invocationCount);
+      sb.append('}');
+      return sb.toString();
+    }
+  }
+
+  /**
+   * Gradually increase the sleep time by the initial interval, until
+   * the limit set by {@code maxIntervalMillis} is reached.
+   */
+  public static class ProportionalRetryInterval implements Callable<Integer> {
+    private final int intervalMillis;
+    private final int maxIntervalMillis;
+    private int current;
+    private int invocationCount = 0;
+
+    public ProportionalRetryInterval(int intervalMillis,
+        int maxIntervalMillis) {
+      Preconditions.checkArgument(intervalMillis > 0);
+      Preconditions.checkArgument(maxIntervalMillis > 0);
+      this.intervalMillis = intervalMillis;
+      this.current = intervalMillis;
+      this.maxIntervalMillis = maxIntervalMillis;
+    }
+
+    @Override
+    public Integer call() throws Exception {
+      invocationCount++;
+      int last = current;
+      if (last < maxIntervalMillis) {
+        current += intervalMillis;
+      }
+      return last;
+    }
+
+    public int getInvocationCount() {
+      return invocationCount;
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder(
+          "ProportionalRetryInterval{");
+      sb.append("interval=").append(intervalMillis);
+      sb.append(", current=").append(current);
+      sb.append(", limit=").append(maxIntervalMillis);
+      sb.append(", invocationCount=").append(invocationCount);
+      sb.append('}');
+      return sb.toString();
+    }
+  }
+
+  /**
+   * An exception which triggers a fast exist from the
+   * {@link #eventually(int, Callable, Callable)} and
+   * {@link #await(int, Callable, Callable, TimeoutHandler)} loops.
+   */
+  public static class FailFastException extends Exception {
+
+    public FailFastException(String detailMessage) {
+      super(detailMessage);
+    }
+
+    public FailFastException(String message, Throwable cause) {
+      super(message, cause);
+    }
+
+    /**
+     * Instantiate from a format string.
+     * @param format format string
+     * @param args arguments to format
+     * @return an instance with the message string constructed.
+     */
+    public static FailFastException newInstance(String format, Object...args) {
+      return new FailFastException(String.format(format, args));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fbf4cd5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/TestLambdaTestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/TestLambdaTestUtils.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/TestLambdaTestUtils.java
new file mode 100644
index 0000000..d3d5cb4
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/TestLambdaTestUtils.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.test;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.test.LambdaTestUtils.*;
+import static org.apache.hadoop.test.GenericTestUtils.*;
+
+/**
+ * Test the logic in {@link LambdaTestUtils}.
+ * This test suite includes Java 8 and Java 7 code; the Java 8 code exists
+ * to verify that the API is easily used with Lambda expressions.
+ */
+public class TestLambdaTestUtils extends Assert {
+
+  public static final int INTERVAL = 10;
+  public static final int TIMEOUT = 50;
+  private FixedRetryInterval retry = new FixedRetryInterval(INTERVAL);
+  // counter for lambda expressions to use
+  private int count;
+
+  /**
+   * Always evaluates to true.
+   */
+  public static final Callable<Boolean> ALWAYS_TRUE =
+      new Callable<Boolean>() {
+        @Override
+        public Boolean call() throws Exception {
+          return true;
+        }
+      };
+
+  /**
+   * Always evaluates to false.
+   */
+  public static final Callable<Boolean> ALWAYS_FALSE =
+      new Callable<Boolean>() {
+        @Override
+        public Boolean call() throws Exception {
+          return false;
+        }
+      };
+
+  /**
+   * Text in the raised FNFE.
+   */
+  public static final String MISSING = "not found";
+
+  /**
+   * A predicate that always throws a FileNotFoundException.
+   */
+  public static final Callable<Boolean> ALWAYS_FNFE =
+      new Callable<Boolean>() {
+        @Override
+        public Boolean call() throws Exception {
+          throw new FileNotFoundException(MISSING);
+        }
+      };
+
+  /**
+   * reusable timeout handler.
+   */
+  public static final GenerateTimeout
+      TIMEOUT_FAILURE_HANDLER = new GenerateTimeout();
+
+  /**
+   * Always evaluates to 3L.
+   */
+  public static final Callable<Long> EVAL_3L = new Callable<Long>() {
+    @Override
+    public Long call() throws Exception {
+      return 3L;
+    }
+  };
+
+  /**
+   * Always raises a {@code FileNotFoundException}.
+   */
+  public static final Callable<Long> EVAL_FNFE = new Callable<Long>() {
+    @Override
+    public Long call() throws Exception {
+      throw new FileNotFoundException(MISSING);
+    }
+  };
+
+  /**
+   * Assert the retry count is as expected.
+   * @param expected expected value
+   */
+  protected void assertRetryCount(int expected) {
+    assertEquals(retry.toString(), expected, retry.getInvocationCount());
+  }
+
+  /**
+   * Assert the retry count is as expected.
+   * @param minCount minimum value
+   */
+  protected void assertMinRetryCount(int minCount) {
+    assertTrue("retry count of " + retry + " is not >= " + minCount,
+        minCount <= retry.getInvocationCount());
+  }
+
+  @Test
+  public void testAwaitAlwaysTrue() throws Throwable {
+    await(TIMEOUT,
+        ALWAYS_TRUE,
+        new FixedRetryInterval(INTERVAL),
+        TIMEOUT_FAILURE_HANDLER);
+  }
+
+  @Test
+  public void testAwaitAlwaysFalse() throws Throwable {
+    try {
+      await(TIMEOUT,
+          ALWAYS_FALSE,
+          retry,
+          TIMEOUT_FAILURE_HANDLER);
+      fail("should not have got here");
+    } catch (TimeoutException e) {
+      assertTrue(retry.getInvocationCount() > 4);
+    }
+  }
+
+  @Test
+  public void testAwaitLinearRetry() throws Throwable {
+    ProportionalRetryInterval linearRetry =
+        new ProportionalRetryInterval(INTERVAL * 2, TIMEOUT * 2);
+    try {
+      await(TIMEOUT,
+          ALWAYS_FALSE,
+          linearRetry,
+          TIMEOUT_FAILURE_HANDLER);
+      fail("should not have got here");
+    } catch (TimeoutException e) {
+      assertEquals(linearRetry.toString(),
+          2, linearRetry.getInvocationCount());
+    }
+  }
+
+  @Test
+  public void testAwaitFNFE() throws Throwable {
+    try {
+      await(TIMEOUT,
+          ALWAYS_FNFE,
+          retry,
+          TIMEOUT_FAILURE_HANDLER);
+      fail("should not have got here");
+    } catch (TimeoutException e) {
+      // inner clause is included
+      assertTrue(retry.getInvocationCount() > 0);
+      assertTrue(e.getCause() instanceof FileNotFoundException);
+      assertExceptionContains(MISSING, e);
+    }
+  }
+
+  @Test
+  public void testRetryInterval() throws Throwable {
+    ProportionalRetryInterval interval =
+        new ProportionalRetryInterval(200, 1000);
+    assertEquals(200, (int) interval.call());
+    assertEquals(400, (int) interval.call());
+    assertEquals(600, (int) interval.call());
+    assertEquals(800, (int) interval.call());
+    assertEquals(1000, (int) interval.call());
+    assertEquals(1000, (int) interval.call());
+    assertEquals(1000, (int) interval.call());
+  }
+
+  @Test
+  public void testInterceptSuccess() throws Throwable {
+    IOException ioe = intercept(IOException.class, ALWAYS_FNFE);
+    assertExceptionContains(MISSING, ioe);
+  }
+
+  @Test
+  public void testInterceptContains() throws Throwable {
+    intercept(IOException.class, MISSING, ALWAYS_FNFE);
+  }
+
+  @Test
+  public void testInterceptContainsWrongString() throws Throwable {
+    try {
+      FileNotFoundException e =
+          intercept(FileNotFoundException.class, "404", ALWAYS_FNFE);
+      assertNotNull(e);
+      throw e;
+    } catch (AssertionError expected) {
+      assertExceptionContains(MISSING, expected);
+    }
+  }
+
+  @Test
+  public void testInterceptVoidCallable() throws Throwable {
+    intercept(AssertionError.class,
+        NULL_RESULT,
+        new Callable<IOException>() {
+          @Override
+          public IOException call() throws Exception {
+            return intercept(IOException.class,
+                new Callable<Void>() {
+                  @Override
+                  public Void call() throws Exception {
+                    return null;
+                  }
+                });
+          }
+        });
+  }
+
+  @Test
+  public void testEventually() throws Throwable {
+    long result = eventually(TIMEOUT, EVAL_3L, retry);
+    assertEquals(3, result);
+    assertEquals(0, retry.getInvocationCount());
+  }
+
+  @Test
+  public void testEventuallyFailuresRetry() throws Throwable {
+    try {
+      eventually(TIMEOUT, EVAL_FNFE, retry);
+      fail("should not have got here");
+    } catch (IOException expected) {
+      // expected
+      assertMinRetryCount(1);
+    }
+  }
+
+  /*
+   * Java 8 Examples go below this line.
+   */
+
+  @Test
+  public void testInterceptFailure() throws Throwable {
+    try {
+      IOException ioe = intercept(IOException.class, () -> "hello");
+      assertNotNull(ioe);
+      throw ioe;
+    } catch (AssertionError expected) {
+      assertExceptionContains("hello", expected);
+    }
+  }
+
+  @Test
+  public void testInterceptInterceptLambda() throws Throwable {
+    // here we use intercept() to test itself.
+    intercept(AssertionError.class,
+        MISSING,
+        () -> intercept(FileNotFoundException.class, "404", ALWAYS_FNFE));
+  }
+
+  @Test
+  public void testInterceptInterceptVoidResultLambda() throws Throwable {
+    // see what happens when a null is returned; type inference -> Void
+    intercept(AssertionError.class,
+        NULL_RESULT,
+        () -> intercept(IOException.class, () -> null));
+  }
+
+  @Test
+  public void testInterceptInterceptStringResultLambda() throws Throwable {
+    // see what happens when a string is returned; it should be used
+    // in the message
+    intercept(AssertionError.class,
+        "hello, world",
+        () -> intercept(IOException.class,
+            () -> "hello, world"));
+  }
+
+  @Test
+  public void testAwaitNoTimeoutLambda() throws Throwable {
+    await(0,
+        () -> true,
+        retry,
+        (timeout, ex) -> ex != null ? ex : new Exception("timeout"));
+    assertRetryCount(0);
+  }
+
+  @Test
+  public void testAwaitLambdaRepetitions() throws Throwable {
+    count = 0;
+
+    // lambda expression which will succeed after exactly 4 probes
+    int reps = await(TIMEOUT,
+        () -> ++count == 4,
+        () -> 10,
+        (timeout, ex) -> ex != null ? ex : new Exception("timeout"));
+    assertEquals(4, reps);
+  }
+
+  @Test
+  public void testInterceptAwaitLambdaException() throws Throwable {
+    count = 0;
+    IOException ioe = intercept(IOException.class,
+        () -> await(
+            TIMEOUT,
+            () -> {
+              throw new IOException("inner " + ++count);
+            },
+            retry,
+            (timeout, ex) -> ex));
+    assertRetryCount(count - 1);
+    // verify that the exception returned was the last one raised
+    assertExceptionContains(Integer.toString(count), ioe);
+  }
+
+  @Test
+  public void testInterceptAwaitLambdaDiagnostics() throws Throwable {
+    intercept(IOException.class, "generated",
+        () -> await(5,
+            () -> false,
+            () -> -1,  // force checks -1 timeout probes
+            (timeout, ex) -> new IOException("generated")));
+  }
+
+  @Test
+  public void testInterceptAwaitFailFastLambda() throws Throwable {
+    intercept(FailFastException.class,
+        () -> await(TIMEOUT,
+            () -> {
+              throw new FailFastException("ffe");
+            },
+            retry,
+            (timeout, ex) -> ex));
+    assertRetryCount(0);
+  }
+
+  @Test
+  public void testEventuallyOnceLambda() throws Throwable {
+    String result = eventually(0, () -> "hello", retry);
+    assertEquals("hello", result);
+    assertEquals(0, retry.getInvocationCount());
+  }
+
+  @Test
+  public void testEventuallyLambda() throws Throwable {
+    long result = eventually(TIMEOUT, () -> 3, retry);
+    assertEquals(3, result);
+    assertRetryCount(0);
+  }
+
+  @Test
+  public void testInterceptEventuallyLambdaFailures() throws Throwable {
+    intercept(IOException.class,
+        "oops",
+        () -> eventually(TIMEOUT,
+            () -> {
+              throw new IOException("oops");
+            },
+            retry));
+    assertMinRetryCount(1);
+  }
+
+  @Test
+  public void testInterceptEventuallyambdaFailuresNegativeRetry()
+      throws Throwable {
+    intercept(FileNotFoundException.class,
+        () -> eventually(TIMEOUT, EVAL_FNFE, () -> -1));
+  }
+
+  @Test
+  public void testInterceptEventuallyLambdaFailFast() throws Throwable {
+    intercept(FailFastException.class, "oops",
+        () -> eventually(
+            TIMEOUT,
+            () -> {
+              throw new FailFastException("oops");
+            },
+            retry));
+    assertRetryCount(0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fbf4cd5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
index 0686488..e284ea7 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
 import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
 import org.apache.hadoop.fs.contract.s3a.S3AContract;
+import org.apache.hadoop.test.LambdaTestUtils;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,8 +36,6 @@ import java.io.FileNotFoundException;
 import java.util.concurrent.Callable;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
-import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 
 /**
  * Test S3A Failure translation, including a functional test
@@ -68,13 +68,15 @@ public class ITestS3AFailureHandling extends 
AbstractFSContractTestBase {
       writeDataset(fs, testpath, shortDataset, shortDataset.length, 1024, 
true);
       // here the file length is less. Probe the file to see if this is true,
       // with a spin and wait
-      eventually(30 *1000, new Callable<Void>() {
-        @Override
-        public Void call() throws Exception {
-          assertEquals(shortLen, fs.getFileStatus(testpath).getLen());
-          return null;
-        }
-      });
+      LambdaTestUtils.eventually(30 * 1000, 1000,
+          new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+              assertEquals(shortLen, fs.getFileStatus(testpath).getLen());
+              return null;
+            }
+          });
+
       // here length is shorter. Assuming it has propagated to all replicas,
       // the position of the input stream is now beyond the EOF.
       // An attempt to seek backwards to a position greater than the

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3fbf4cd5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index c67e118..19dccac 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.concurrent.Callable;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
 import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
@@ -136,32 +135,6 @@ public class S3ATestUtils {
   }
 
   /**
-   * Repeatedly attempt a callback until timeout or a {@link FailFastException}
-   * is raised. This is modeled on ScalaTests {@code eventually(Closure)} code.
-   * @param timeout timeout
-   * @param callback callback to invoke
-   * @throws FailFastException any fast-failure
-   * @throws Exception the exception which caused the iterator to fail
-   */
-  public static void eventually(int timeout, Callable<Void> callback)
-      throws Exception {
-    Exception lastException;
-    long endtime = System.currentTimeMillis() + timeout;
-    do {
-      try {
-        callback.call();
-        return;
-      } catch (InterruptedException | FailFastException e) {
-        throw e;
-      } catch (Exception e) {
-        lastException = e;
-      }
-      Thread.sleep(500);
-    } while (endtime > System.currentTimeMillis());
-    throw lastException;
-  }
-
-  /**
    * patch the endpoint option so that irrespective of where other tests
    * are working, the IO performance tests can work with the landsat
    * images.
@@ -291,27 +264,6 @@ public class S3ATestUtils {
   }
 
   /**
-   * The exception to raise so as to exit fast from
-   * {@link #eventually(int, Callable)}.
-   */
-  public static class FailFastException extends Exception {
-    public FailFastException() {
-    }
-
-    public FailFastException(String message) {
-      super(message);
-    }
-
-    public FailFastException(String message, Throwable cause) {
-      super(message, cause);
-    }
-
-    public FailFastException(Throwable cause) {
-      super(cause);
-    }
-  }
-
-  /**
    * Verify the class of an exception. If it is not as expected, rethrow it.
    * Comparison is on the exact class, not subclass-of inference as
    * offered by {@code instanceof}.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to