Repository: beam
Updated Branches:
  refs/heads/master 055f452c0 -> 51820cbe0


Captures assertion site and message in PAssert

This makes PAssert failures quite a bit easier to debug.
Example message after this commit:

java.lang.AssertionError: Some message

        at 
org.apache.beam.sdk.testing.PAssert$PAssertionSite.capture(PAssert.java:384)
        at org.apache.beam.sdk.testing.PAssert.that(PAssert.java:279)
        at 
org.apache.beam.sdk.transforms.SplittableDoFnTest.testPairWithIndexWindowedTimestamped(SplittableDoFnTest.java:234)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        ...
Caused by: java.lang.AssertionError:
Expected: iterable over [<TimestampedValue(KV{z, 0}, 
2017-01-10T00:38:28.000Z)>, <TimestampedValue(KV{bb, 0}, 
2017-01-10T00:38:29.000Z)>, <TimestampedValue(KV{bb, 1}, 
2017-01-10T00:38:29.000Z)>, <TimestampedValue(KV{ccccc, 0}, 
2017-01-10T00:38:30.000Z)>, <TimestampedValue(KV{ccccc, 1}, 
2017-01-10T00:38:30.000Z)>, <TimestampedValue(KV{ccccc, 2}, 
2017-01-10T00:38:30.000Z)>, <TimestampedValue(KV{ccccc, 3}, 
2017-01-10T00:38:30.000Z)>, <TimestampedValue(KV{ccccc, 4}, 
2017-01-10T00:38:30.000Z)>] in any order
     but: Not matched: <TimestampedValue(KV{a, 0}, 2017-01-10T00:38:28.000Z)>
        at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
        at org.junit.Assert.assertThat(Assert.java:956)
        at org.junit.Assert.assertThat(Assert.java:923)
        at 
org.apache.beam.sdk.testing.PAssert$AssertContainsInAnyOrder.apply(PAssert.java:1270)
        at 
org.apache.beam.sdk.testing.PAssert$AssertContainsInAnyOrder.apply(PAssert.java:1)
        ...

(as opposed to, basically, just the "Caused by" part)


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c62611c7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c62611c7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c62611c7

Branch: refs/heads/master
Commit: c62611c73ab0f9a5769f3ee9b28b11e917628f78
Parents: b81bd25
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Mon Jan 9 16:38:50 2017 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Tue Jan 10 21:45:21 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/testing/PAssert.java    | 163 +++++++++++++++----
 .../apache/beam/sdk/testing/PAssertTest.java    |  44 +++++
 2 files changed, 174 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c62611c7/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index b57f4a9..89d6fea 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -275,8 +275,13 @@ public class PAssert {
   /**
    * Constructs an {@link IterableAssert} for the elements of the provided 
{@link PCollection}.
    */
+  public static <T> IterableAssert<T> that(String message, PCollection<T> 
actual) {
+    return new PCollectionContentsAssert<>(PAssertionSite.capture(message), 
actual);
+  }
+
+  /** @see #that(String, PCollection) */
   public static <T> IterableAssert<T> that(PCollection<T> actual) {
-    return new PCollectionContentsAssert<>(actual);
+    return that("", actual);
   }
 
   /**
@@ -284,7 +289,7 @@ public class PAssert {
    * must contain a single {@code Iterable<T>} value.
    */
   public static <T> IterableAssert<T> thatSingletonIterable(
-      PCollection<? extends Iterable<T>> actual) {
+      String message, PCollection<? extends Iterable<T>> actual) {
 
     try {
     } catch (NoSuchElementException | IllegalArgumentException exc) {
@@ -297,15 +302,29 @@ public class PAssert {
     @SuppressWarnings("unchecked") // Safe covariant cast
     PCollection<Iterable<T>> actualIterables = (PCollection<Iterable<T>>) 
actual;
 
-    return new PCollectionSingletonIterableAssert<>(actualIterables);
+    return new PCollectionSingletonIterableAssert<>(
+        PAssertionSite.capture(message), actualIterables);
   }
 
-  /**
-   * Constructs a {@link SingletonAssert} for the value of the provided
-   * {@code PCollection PCollection<T>}, which must be a singleton.
-   */
+  /** @see #thatSingletonIterable(String, PCollection)  */
+  public static <T> IterableAssert<T> thatSingletonIterable(
+      PCollection<? extends Iterable<T>> actual) {
+    return thatSingletonIterable("", actual);
+  }
+
+    /**
+     * Constructs a {@link SingletonAssert} for the value of the provided
+     * {@code PCollection PCollection<T>}, which must be a singleton.
+     */
+  public static <T> SingletonAssert<T> thatSingleton(String message, 
PCollection<T> actual) {
+    return new PCollectionViewAssert<>(
+        PAssertionSite.capture(message),
+        actual, View.<T>asSingleton(), actual.getCoder());
+  }
+
+  /** @see #thatSingleton(String, PCollection) */
   public static <T> SingletonAssert<T> thatSingleton(PCollection<T> actual) {
-    return new PCollectionViewAssert<>(actual, View.<T>asSingleton(), 
actual.getCoder());
+    return thatSingleton("", actual);
   }
 
   /**
@@ -315,48 +334,90 @@ public class PAssert {
    * {@code Coder<K, V>}.
    */
   public static <K, V> SingletonAssert<Map<K, Iterable<V>>> thatMultimap(
+      String message,
       PCollection<KV<K, V>> actual) {
     @SuppressWarnings("unchecked")
     KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder();
     return new PCollectionViewAssert<>(
+        PAssertionSite.capture(message),
         actual,
         View.<K, V>asMultimap(),
         MapCoder.of(kvCoder.getKeyCoder(), 
IterableCoder.of(kvCoder.getValueCoder())));
   }
 
+  /** @see #thatMultimap(String, PCollection) */
+  public static <K, V> SingletonAssert<Map<K, Iterable<V>>> thatMultimap(
+      PCollection<KV<K, V>> actual) {
+    return thatMultimap("", actual);
+  }
+
   /**
    * Constructs a {@link SingletonAssert} for the value of the provided {@link 
PCollection}, which
    * must have at most one value per key.
    *
-   * <p>Note that the actual value must be coded by a {@link KvCoder}, not 
just any
-   * {@code Coder<K, V>}.
+   * <p>Note that the actual value must be coded by a {@link KvCoder}, not 
just any {@code Coder<K,
+   * V>}.
    */
-  public static <K, V> SingletonAssert<Map<K, V>> thatMap(PCollection<KV<K, 
V>> actual) {
+  public static <K, V> SingletonAssert<Map<K, V>> thatMap(
+      String message, PCollection<KV<K, V>> actual) {
     @SuppressWarnings("unchecked")
     KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder();
     return new PCollectionViewAssert<>(
-        actual, View.<K, V>asMap(), MapCoder.of(kvCoder.getKeyCoder(), 
kvCoder.getValueCoder()));
+        PAssertionSite.capture(message),
+        actual,
+        View.<K, V>asMap(),
+        MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder()));
+  }
+
+  /** @see #thatMap(String, PCollection) */
+  public static <K, V> SingletonAssert<Map<K, V>> thatMap(PCollection<KV<K, 
V>> actual) {
+    return thatMap("", actual);
   }
 
   ////////////////////////////////////////////////////////////
 
+  private static class PAssertionSite implements Serializable {
+    private final String message;
+    private final StackTraceElement[] creationStackTrace;
+
+    static PAssertionSite capture(String message) {
+      return new PAssertionSite(message, new Throwable().getStackTrace());
+    }
+
+    PAssertionSite(String message, StackTraceElement[] creationStackTrace) {
+      this.message = message;
+      this.creationStackTrace = creationStackTrace;
+    }
+
+    public AssertionError wrap(Throwable t) {
+      AssertionError res =
+          new AssertionError(
+              message.isEmpty() ? t.getMessage() : (message + ": " + 
t.getMessage()), t);
+      res.setStackTrace(creationStackTrace);
+      return res;
+    }
+  }
+
   /**
    * An {@link IterableAssert} about the contents of a {@link PCollection}. 
This does not require
    * the runner to support side inputs.
    */
   private static class PCollectionContentsAssert<T> implements 
IterableAssert<T> {
+    private final PAssertionSite site;
     private final PCollection<T> actual;
     private final AssertionWindows rewindowingStrategy;
     private final SimpleFunction<Iterable<ValueInSingleWindow<T>>, 
Iterable<T>> paneExtractor;
 
-    public PCollectionContentsAssert(PCollection<T> actual) {
-      this(actual, IntoGlobalWindow.<T>of(), PaneExtractors.<T>allPanes());
+    public PCollectionContentsAssert(PAssertionSite site, PCollection<T> 
actual) {
+      this(site, actual, IntoGlobalWindow.<T>of(), 
PaneExtractors.<T>allPanes());
     }
 
     public PCollectionContentsAssert(
+        PAssertionSite site,
         PCollection<T> actual,
         AssertionWindows rewindowingStrategy,
         SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> 
paneExtractor) {
+      this.site = site;
       this.actual = actual;
       this.rewindowingStrategy = rewindowingStrategy;
       this.paneExtractor = paneExtractor;
@@ -394,7 +455,7 @@ public class PAssert {
       Coder<BoundedWindow> windowCoder =
           (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder();
       return new PCollectionContentsAssert<>(
-          actual, IntoStaticWindows.<T>of(windowCoder, window), paneExtractor);
+          site, actual, IntoStaticWindows.<T>of(windowCoder, window), 
paneExtractor);
     }
 
     /**
@@ -429,7 +490,7 @@ public class PAssert {
         SerializableFunction<Iterable<T>, Void> checkerFn) {
       actual.apply(
           nextAssertionName(),
-          new GroupThenAssert<>(checkerFn, rewindowingStrategy, 
paneExtractor));
+          new GroupThenAssert<>(site, checkerFn, rewindowingStrategy, 
paneExtractor));
       return this;
     }
 
@@ -471,7 +532,7 @@ public class PAssert {
           (SerializableFunction) new MatcherCheckerFn<>(matcher);
       actual.apply(
           "PAssert$" + (assertCount++),
-          new GroupThenAssert<>(checkerFn, rewindowingStrategy, 
paneExtractor));
+          new GroupThenAssert<>(site, checkerFn, rewindowingStrategy, 
paneExtractor));
       return this;
     }
 
@@ -518,21 +579,26 @@ public class PAssert {
    * This does not require the runner to support side inputs.
    */
   private static class PCollectionSingletonIterableAssert<T> implements 
IterableAssert<T> {
+    private final PAssertionSite site;
     private final PCollection<Iterable<T>> actual;
     private final Coder<T> elementCoder;
     private final AssertionWindows rewindowingStrategy;
     private final SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, 
Iterable<Iterable<T>>>
         paneExtractor;
 
-    public PCollectionSingletonIterableAssert(PCollection<Iterable<T>> actual) 
{
-      this(actual, IntoGlobalWindow.<Iterable<T>>of(), 
PaneExtractors.<Iterable<T>>onlyPane());
+    public PCollectionSingletonIterableAssert(
+        PAssertionSite site, PCollection<Iterable<T>> actual) {
+      this(
+          site, actual, IntoGlobalWindow.<Iterable<T>>of(), 
PaneExtractors.<Iterable<T>>onlyPane());
     }
 
     public PCollectionSingletonIterableAssert(
+        PAssertionSite site,
         PCollection<Iterable<T>> actual,
         AssertionWindows rewindowingStrategy,
         SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, 
Iterable<Iterable<T>>>
             paneExtractor) {
+      this.site = site;
       this.actual = actual;
 
       @SuppressWarnings("unchecked")
@@ -576,7 +642,7 @@ public class PAssert {
       Coder<BoundedWindow> windowCoder =
           (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder();
       return new PCollectionSingletonIterableAssert<>(
-          actual, IntoStaticWindows.<Iterable<T>>of(windowCoder, window), 
paneExtractor);
+          site, actual, IntoStaticWindows.<Iterable<T>>of(windowCoder, 
window), paneExtractor);
     }
 
     @Override
@@ -600,7 +666,7 @@ public class PAssert {
         SerializableFunction<Iterable<T>, Void> checkerFn) {
       actual.apply(
           "PAssert$" + (assertCount++),
-          new GroupThenAssertForSingleton<>(checkerFn, rewindowingStrategy, 
paneExtractor));
+          new GroupThenAssertForSingleton<>(site, checkerFn, 
rewindowingStrategy, paneExtractor));
       return this;
     }
 
@@ -617,6 +683,7 @@ public class PAssert {
    * of type {@code ViewT}. This requires side input support from the runner.
    */
   private static class PCollectionViewAssert<ElemT, ViewT> implements 
SingletonAssert<ViewT> {
+    private final PAssertionSite site;
     private final PCollection<ElemT> actual;
     private final PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view;
     private final AssertionWindows rewindowActuals;
@@ -625,18 +692,27 @@ public class PAssert {
     private final Coder<ViewT> coder;
 
     protected PCollectionViewAssert(
+        PAssertionSite site,
         PCollection<ElemT> actual,
         PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view,
         Coder<ViewT> coder) {
-      this(actual, view, IntoGlobalWindow.<ElemT>of(), 
PaneExtractors.<ElemT>onlyPane(), coder);
+      this(
+          site,
+          actual,
+          view,
+          IntoGlobalWindow.<ElemT>of(),
+          PaneExtractors.<ElemT>onlyPane(),
+          coder);
     }
 
     private PCollectionViewAssert(
+        PAssertionSite site,
         PCollection<ElemT> actual,
         PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view,
         AssertionWindows rewindowActuals,
         SimpleFunction<Iterable<ValueInSingleWindow<ElemT>>, Iterable<ElemT>> 
paneExtractor,
         Coder<ViewT> coder) {
+      this.site = site;
       this.actual = actual;
       this.view = view;
       this.rewindowActuals = rewindowActuals;
@@ -663,6 +739,7 @@ public class PAssert {
         BoundedWindow window,
         SimpleFunction<Iterable<ValueInSingleWindow<ElemT>>, Iterable<ElemT>> 
paneExtractor) {
       return new PCollectionViewAssert<>(
+          site,
           actual,
           view,
           IntoStaticWindows.of(
@@ -689,6 +766,7 @@ public class PAssert {
           .apply(
               "PAssert$" + (assertCount++),
               new OneSideInputAssert<ViewT>(
+                  site,
                   CreateActual.from(actual, rewindowActuals, paneExtractor, 
view),
                   rewindowActuals.<Integer>windowDummy(),
                   checkerFn));
@@ -911,14 +989,17 @@ public class PAssert {
    */
   public static class GroupThenAssert<T> extends PTransform<PCollection<T>, 
PDone>
       implements Serializable {
+    private final PAssertionSite site;
     private final SerializableFunction<Iterable<T>, Void> checkerFn;
     private final AssertionWindows rewindowingStrategy;
     private final SimpleFunction<Iterable<ValueInSingleWindow<T>>, 
Iterable<T>> paneExtractor;
 
     private GroupThenAssert(
+        PAssertionSite site,
         SerializableFunction<Iterable<T>, Void> checkerFn,
         AssertionWindows rewindowingStrategy,
         SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> 
paneExtractor) {
+      this.site = site;
       this.checkerFn = checkerFn;
       this.rewindowingStrategy = rewindowingStrategy;
       this.paneExtractor = paneExtractor;
@@ -930,7 +1011,7 @@ public class PAssert {
           .apply("GroupGlobally", new GroupGlobally<T>(rewindowingStrategy))
           .apply("GetPane", MapElements.via(paneExtractor))
           .setCoder(IterableCoder.of(input.getCoder()))
-          .apply("RunChecks", ParDo.of(new 
GroupedValuesCheckerDoFn<>(checkerFn)));
+          .apply("RunChecks", ParDo.of(new GroupedValuesCheckerDoFn<>(site, 
checkerFn)));
 
       return PDone.in(input.getPipeline());
     }
@@ -942,16 +1023,19 @@ public class PAssert {
    */
   public static class GroupThenAssertForSingleton<T>
       extends PTransform<PCollection<Iterable<T>>, PDone> implements 
Serializable {
+    private final PAssertionSite site;
     private final SerializableFunction<Iterable<T>, Void> checkerFn;
     private final AssertionWindows rewindowingStrategy;
     private final SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, 
Iterable<Iterable<T>>>
         paneExtractor;
 
     private GroupThenAssertForSingleton(
+        PAssertionSite site,
         SerializableFunction<Iterable<T>, Void> checkerFn,
         AssertionWindows rewindowingStrategy,
         SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, 
Iterable<Iterable<T>>>
             paneExtractor) {
+      this.site = site;
       this.checkerFn = checkerFn;
       this.rewindowingStrategy = rewindowingStrategy;
       this.paneExtractor = paneExtractor;
@@ -963,7 +1047,7 @@ public class PAssert {
           .apply("GroupGlobally", new 
GroupGlobally<Iterable<T>>(rewindowingStrategy))
           .apply("GetPane", MapElements.via(paneExtractor))
           .setCoder(IterableCoder.of(input.getCoder()))
-          .apply("RunChecks", ParDo.of(new SingletonCheckerDoFn<>(checkerFn)));
+          .apply("RunChecks", ParDo.of(new SingletonCheckerDoFn<>(site, 
checkerFn)));
 
       return PDone.in(input.getPipeline());
     }
@@ -981,14 +1065,17 @@ public class PAssert {
    */
   public static class OneSideInputAssert<ActualT> extends PTransform<PBegin, 
PDone>
       implements Serializable {
+    private final PAssertionSite site;
     private final transient PTransform<PBegin, PCollectionView<ActualT>> 
createActual;
     private final transient PTransform<PCollection<Integer>, 
PCollection<Integer>> windowToken;
     private final SerializableFunction<ActualT, Void> checkerFn;
 
     private OneSideInputAssert(
+        PAssertionSite site,
         PTransform<PBegin, PCollectionView<ActualT>> createActual,
         PTransform<PCollection<Integer>, PCollection<Integer>> windowToken,
         SerializableFunction<ActualT, Void> checkerFn) {
+      this.site = site;
       this.createActual = createActual;
       this.windowToken = windowToken;
       this.checkerFn = checkerFn;
@@ -1003,7 +1090,7 @@ public class PAssert {
           .apply("WindowToken", windowToken)
           .apply(
               "RunChecks",
-              ParDo.withSideInputs(actual).of(new 
SideInputCheckerDoFn<>(checkerFn, actual)));
+              ParDo.withSideInputs(actual).of(new SideInputCheckerDoFn<>(site, 
checkerFn, actual)));
 
       return PDone.in(input.getPipeline());
     }
@@ -1017,6 +1104,7 @@ public class PAssert {
    * null values.
    */
   private static class SideInputCheckerDoFn<ActualT> extends DoFn<Integer, 
Void> {
+    private final PAssertionSite site;
     private final SerializableFunction<ActualT, Void> checkerFn;
     private final Aggregator<Integer, Integer> success =
         createAggregator(SUCCESS_COUNTER, Sum.ofIntegers());
@@ -1025,7 +1113,10 @@ public class PAssert {
     private final PCollectionView<ActualT> actual;
 
     private SideInputCheckerDoFn(
-        SerializableFunction<ActualT, Void> checkerFn, 
PCollectionView<ActualT> actual) {
+        PAssertionSite site,
+        SerializableFunction<ActualT, Void> checkerFn,
+        PCollectionView<ActualT> actual) {
+      this.site = site;
       this.checkerFn = checkerFn;
       this.actual = actual;
     }
@@ -1034,7 +1125,7 @@ public class PAssert {
     public void processElement(ProcessContext c) {
       try {
         ActualT actualContents = c.sideInput(actual);
-        doChecks(actualContents, checkerFn, success, failure);
+        doChecks(site, actualContents, checkerFn, success, failure);
       } catch (Throwable t) {
         // Suppress exception in streaming
         if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) {
@@ -1052,19 +1143,22 @@ public class PAssert {
    * <p>The singleton property is presumed, not enforced.
    */
   private static class GroupedValuesCheckerDoFn<ActualT> extends DoFn<ActualT, 
Void> {
+    private final PAssertionSite site;
     private final SerializableFunction<ActualT, Void> checkerFn;
     private final Aggregator<Integer, Integer> success =
         createAggregator(SUCCESS_COUNTER, Sum.ofIntegers());
     private final Aggregator<Integer, Integer> failure =
         createAggregator(FAILURE_COUNTER, Sum.ofIntegers());
 
-    private GroupedValuesCheckerDoFn(SerializableFunction<ActualT, Void> 
checkerFn) {
+    private GroupedValuesCheckerDoFn(
+        PAssertionSite site, SerializableFunction<ActualT, Void> checkerFn) {
+      this.site = site;
       this.checkerFn = checkerFn;
     }
 
     @ProcessElement
     public void processElement(ProcessContext c) {
-      doChecks(c.element(), checkerFn, success, failure);
+      doChecks(site, c.element(), checkerFn, success, failure);
     }
   }
 
@@ -1077,24 +1171,28 @@ public class PAssert {
    * each input element must be a singleton iterable, or this will fail.
    */
   private static class SingletonCheckerDoFn<ActualT> extends 
DoFn<Iterable<ActualT>, Void> {
+    private final PAssertionSite site;
     private final SerializableFunction<ActualT, Void> checkerFn;
     private final Aggregator<Integer, Integer> success =
         createAggregator(SUCCESS_COUNTER, Sum.ofIntegers());
     private final Aggregator<Integer, Integer> failure =
         createAggregator(FAILURE_COUNTER, Sum.ofIntegers());
 
-    private SingletonCheckerDoFn(SerializableFunction<ActualT, Void> 
checkerFn) {
+    private SingletonCheckerDoFn(
+        PAssertionSite site, SerializableFunction<ActualT, Void> checkerFn) {
+      this.site = site;
       this.checkerFn = checkerFn;
     }
 
     @ProcessElement
     public void processElement(ProcessContext c) {
       ActualT actualContents = Iterables.getOnlyElement(c.element());
-      doChecks(actualContents, checkerFn, success, failure);
+      doChecks(site, actualContents, checkerFn, success, failure);
     }
   }
 
   private static <ActualT> void doChecks(
+      PAssertionSite site,
       ActualT actualContents,
       SerializableFunction<ActualT, Void> checkerFn,
       Aggregator<Integer, Integer> successAggregator,
@@ -1103,9 +1201,8 @@ public class PAssert {
       checkerFn.apply(actualContents);
       successAggregator.addValue(1);
     } catch (Throwable t) {
-      LOG.error("PAssert failed expectations.", t);
       failureAggregator.addValue(1);
-      throw t;
+      throw site.wrap(t);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c62611c7/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index 1997bbe..e09f54b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.io.InputStream;
@@ -392,6 +393,49 @@ public class PAssertTest implements Serializable {
     assertThat(thrown.getMessage(), containsString("Expected: iterable over [] 
in any order"));
   }
 
+  @Test
+  @Category(RunnableOnService.class)
+  public void testAssertionSiteIsCapturedWithMessage() throws Exception {
+    PCollection<Long> vals = pipeline.apply(CountingInput.upTo(5L));
+    assertThatCollectionIsEmptyWithMessage(vals);
+
+    Throwable thrown = runExpectingAssertionFailure(pipeline);
+
+    assertThat(
+        thrown.getMessage(),
+        containsString("Should be empty"));
+    assertThat(
+        thrown.getMessage(),
+        containsString("Expected: iterable over [] in any order"));
+    String stacktrace = Throwables.getStackTraceAsString(thrown);
+    assertThat(stacktrace, 
containsString("testAssertionSiteIsCapturedWithMessage"));
+    assertThat(stacktrace, 
containsString("assertThatCollectionIsEmptyWithMessage"));
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testAssertionSiteIsCapturedWithoutMessage() throws Exception {
+    PCollection<Long> vals = pipeline.apply(CountingInput.upTo(5L));
+    assertThatCollectionIsEmptyWithoutMessage(vals);
+
+    Throwable thrown = runExpectingAssertionFailure(pipeline);
+
+    assertThat(
+        thrown.getMessage(),
+        containsString("Expected: iterable over [] in any order"));
+    String stacktrace = Throwables.getStackTraceAsString(thrown);
+    assertThat(stacktrace, 
containsString("testAssertionSiteIsCapturedWithoutMessage"));
+    assertThat(stacktrace, 
containsString("assertThatCollectionIsEmptyWithoutMessage"));
+  }
+
+  private static void assertThatCollectionIsEmptyWithMessage(PCollection<Long> 
vals) {
+    PAssert.that("Should be empty", vals).empty();
+  }
+
+  private static void 
assertThatCollectionIsEmptyWithoutMessage(PCollection<Long> vals) {
+    PAssert.that(vals).empty();
+  }
+
   private static Throwable runExpectingAssertionFailure(Pipeline pipeline) {
     // We cannot use thrown.expect(AssertionError.class) because the 
AssertionError
     // is first caught by JUnit and causes a test failure.

Reply via email to