Repository: beam Updated Branches: refs/heads/master 055f452c0 -> 51820cbe0
Captures assertion site and message in PAssert This makes PAssert failures quite a bit easier to debug. Example message after this commit: java.lang.AssertionError: Some message at org.apache.beam.sdk.testing.PAssert$PAssertionSite.capture(PAssert.java:384) at org.apache.beam.sdk.testing.PAssert.that(PAssert.java:279) at org.apache.beam.sdk.transforms.SplittableDoFnTest.testPairWithIndexWindowedTimestamped(SplittableDoFnTest.java:234) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ... Caused by: java.lang.AssertionError: Expected: iterable over [<TimestampedValue(KV{z, 0}, 2017-01-10T00:38:28.000Z)>, <TimestampedValue(KV{bb, 0}, 2017-01-10T00:38:29.000Z)>, <TimestampedValue(KV{bb, 1}, 2017-01-10T00:38:29.000Z)>, <TimestampedValue(KV{ccccc, 0}, 2017-01-10T00:38:30.000Z)>, <TimestampedValue(KV{ccccc, 1}, 2017-01-10T00:38:30.000Z)>, <TimestampedValue(KV{ccccc, 2}, 2017-01-10T00:38:30.000Z)>, <TimestampedValue(KV{ccccc, 3}, 2017-01-10T00:38:30.000Z)>, <TimestampedValue(KV{ccccc, 4}, 2017-01-10T00:38:30.000Z)>] in any order but: Not matched: <TimestampedValue(KV{a, 0}, 2017-01-10T00:38:28.000Z)> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.junit.Assert.assertThat(Assert.java:956) at org.junit.Assert.assertThat(Assert.java:923) at org.apache.beam.sdk.testing.PAssert$AssertContainsInAnyOrder.apply(PAssert.java:1270) at org.apache.beam.sdk.testing.PAssert$AssertContainsInAnyOrder.apply(PAssert.java:1) ... (as opposed to, basically, just the "Caused by" part) Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c62611c7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c62611c7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c62611c7 Branch: refs/heads/master Commit: c62611c73ab0f9a5769f3ee9b28b11e917628f78 Parents: b81bd25 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Mon Jan 9 16:38:50 2017 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Jan 10 21:45:21 2017 -0800 ---------------------------------------------------------------------- .../org/apache/beam/sdk/testing/PAssert.java | 163 +++++++++++++++---- .../apache/beam/sdk/testing/PAssertTest.java | 44 +++++ 2 files changed, 174 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c62611c7/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java index b57f4a9..89d6fea 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java @@ -275,8 +275,13 @@ public class PAssert { /** * Constructs an {@link IterableAssert} for the elements of the provided {@link PCollection}. */ + public static <T> IterableAssert<T> that(String message, PCollection<T> actual) { + return new PCollectionContentsAssert<>(PAssertionSite.capture(message), actual); + } + + /** @see #that(String, PCollection) */ public static <T> IterableAssert<T> that(PCollection<T> actual) { - return new PCollectionContentsAssert<>(actual); + return that("", actual); } /** @@ -284,7 +289,7 @@ public class PAssert { * must contain a single {@code Iterable<T>} value. */ public static <T> IterableAssert<T> thatSingletonIterable( - PCollection<? extends Iterable<T>> actual) { + String message, PCollection<? extends Iterable<T>> actual) { try { } catch (NoSuchElementException | IllegalArgumentException exc) { @@ -297,15 +302,29 @@ public class PAssert { @SuppressWarnings("unchecked") // Safe covariant cast PCollection<Iterable<T>> actualIterables = (PCollection<Iterable<T>>) actual; - return new PCollectionSingletonIterableAssert<>(actualIterables); + return new PCollectionSingletonIterableAssert<>( + PAssertionSite.capture(message), actualIterables); } - /** - * Constructs a {@link SingletonAssert} for the value of the provided - * {@code PCollection PCollection<T>}, which must be a singleton. - */ + /** @see #thatSingletonIterable(String, PCollection) */ + public static <T> IterableAssert<T> thatSingletonIterable( + PCollection<? extends Iterable<T>> actual) { + return thatSingletonIterable("", actual); + } + + /** + * Constructs a {@link SingletonAssert} for the value of the provided + * {@code PCollection PCollection<T>}, which must be a singleton. + */ + public static <T> SingletonAssert<T> thatSingleton(String message, PCollection<T> actual) { + return new PCollectionViewAssert<>( + PAssertionSite.capture(message), + actual, View.<T>asSingleton(), actual.getCoder()); + } + + /** @see #thatSingleton(String, PCollection) */ public static <T> SingletonAssert<T> thatSingleton(PCollection<T> actual) { - return new PCollectionViewAssert<>(actual, View.<T>asSingleton(), actual.getCoder()); + return thatSingleton("", actual); } /** @@ -315,48 +334,90 @@ public class PAssert { * {@code Coder<K, V>}. */ public static <K, V> SingletonAssert<Map<K, Iterable<V>>> thatMultimap( + String message, PCollection<KV<K, V>> actual) { @SuppressWarnings("unchecked") KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder(); return new PCollectionViewAssert<>( + PAssertionSite.capture(message), actual, View.<K, V>asMultimap(), MapCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder()))); } + /** @see #thatMultimap(String, PCollection) */ + public static <K, V> SingletonAssert<Map<K, Iterable<V>>> thatMultimap( + PCollection<KV<K, V>> actual) { + return thatMultimap("", actual); + } + /** * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection}, which * must have at most one value per key. * - * <p>Note that the actual value must be coded by a {@link KvCoder}, not just any - * {@code Coder<K, V>}. + * <p>Note that the actual value must be coded by a {@link KvCoder}, not just any {@code Coder<K, + * V>}. */ - public static <K, V> SingletonAssert<Map<K, V>> thatMap(PCollection<KV<K, V>> actual) { + public static <K, V> SingletonAssert<Map<K, V>> thatMap( + String message, PCollection<KV<K, V>> actual) { @SuppressWarnings("unchecked") KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder(); return new PCollectionViewAssert<>( - actual, View.<K, V>asMap(), MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder())); + PAssertionSite.capture(message), + actual, + View.<K, V>asMap(), + MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder())); + } + + /** @see #thatMap(String, PCollection) */ + public static <K, V> SingletonAssert<Map<K, V>> thatMap(PCollection<KV<K, V>> actual) { + return thatMap("", actual); } //////////////////////////////////////////////////////////// + private static class PAssertionSite implements Serializable { + private final String message; + private final StackTraceElement[] creationStackTrace; + + static PAssertionSite capture(String message) { + return new PAssertionSite(message, new Throwable().getStackTrace()); + } + + PAssertionSite(String message, StackTraceElement[] creationStackTrace) { + this.message = message; + this.creationStackTrace = creationStackTrace; + } + + public AssertionError wrap(Throwable t) { + AssertionError res = + new AssertionError( + message.isEmpty() ? t.getMessage() : (message + ": " + t.getMessage()), t); + res.setStackTrace(creationStackTrace); + return res; + } + } + /** * An {@link IterableAssert} about the contents of a {@link PCollection}. This does not require * the runner to support side inputs. */ private static class PCollectionContentsAssert<T> implements IterableAssert<T> { + private final PAssertionSite site; private final PCollection<T> actual; private final AssertionWindows rewindowingStrategy; private final SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> paneExtractor; - public PCollectionContentsAssert(PCollection<T> actual) { - this(actual, IntoGlobalWindow.<T>of(), PaneExtractors.<T>allPanes()); + public PCollectionContentsAssert(PAssertionSite site, PCollection<T> actual) { + this(site, actual, IntoGlobalWindow.<T>of(), PaneExtractors.<T>allPanes()); } public PCollectionContentsAssert( + PAssertionSite site, PCollection<T> actual, AssertionWindows rewindowingStrategy, SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> paneExtractor) { + this.site = site; this.actual = actual; this.rewindowingStrategy = rewindowingStrategy; this.paneExtractor = paneExtractor; @@ -394,7 +455,7 @@ public class PAssert { Coder<BoundedWindow> windowCoder = (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder(); return new PCollectionContentsAssert<>( - actual, IntoStaticWindows.<T>of(windowCoder, window), paneExtractor); + site, actual, IntoStaticWindows.<T>of(windowCoder, window), paneExtractor); } /** @@ -429,7 +490,7 @@ public class PAssert { SerializableFunction<Iterable<T>, Void> checkerFn) { actual.apply( nextAssertionName(), - new GroupThenAssert<>(checkerFn, rewindowingStrategy, paneExtractor)); + new GroupThenAssert<>(site, checkerFn, rewindowingStrategy, paneExtractor)); return this; } @@ -471,7 +532,7 @@ public class PAssert { (SerializableFunction) new MatcherCheckerFn<>(matcher); actual.apply( "PAssert$" + (assertCount++), - new GroupThenAssert<>(checkerFn, rewindowingStrategy, paneExtractor)); + new GroupThenAssert<>(site, checkerFn, rewindowingStrategy, paneExtractor)); return this; } @@ -518,21 +579,26 @@ public class PAssert { * This does not require the runner to support side inputs. */ private static class PCollectionSingletonIterableAssert<T> implements IterableAssert<T> { + private final PAssertionSite site; private final PCollection<Iterable<T>> actual; private final Coder<T> elementCoder; private final AssertionWindows rewindowingStrategy; private final SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, Iterable<Iterable<T>>> paneExtractor; - public PCollectionSingletonIterableAssert(PCollection<Iterable<T>> actual) { - this(actual, IntoGlobalWindow.<Iterable<T>>of(), PaneExtractors.<Iterable<T>>onlyPane()); + public PCollectionSingletonIterableAssert( + PAssertionSite site, PCollection<Iterable<T>> actual) { + this( + site, actual, IntoGlobalWindow.<Iterable<T>>of(), PaneExtractors.<Iterable<T>>onlyPane()); } public PCollectionSingletonIterableAssert( + PAssertionSite site, PCollection<Iterable<T>> actual, AssertionWindows rewindowingStrategy, SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, Iterable<Iterable<T>>> paneExtractor) { + this.site = site; this.actual = actual; @SuppressWarnings("unchecked") @@ -576,7 +642,7 @@ public class PAssert { Coder<BoundedWindow> windowCoder = (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder(); return new PCollectionSingletonIterableAssert<>( - actual, IntoStaticWindows.<Iterable<T>>of(windowCoder, window), paneExtractor); + site, actual, IntoStaticWindows.<Iterable<T>>of(windowCoder, window), paneExtractor); } @Override @@ -600,7 +666,7 @@ public class PAssert { SerializableFunction<Iterable<T>, Void> checkerFn) { actual.apply( "PAssert$" + (assertCount++), - new GroupThenAssertForSingleton<>(checkerFn, rewindowingStrategy, paneExtractor)); + new GroupThenAssertForSingleton<>(site, checkerFn, rewindowingStrategy, paneExtractor)); return this; } @@ -617,6 +683,7 @@ public class PAssert { * of type {@code ViewT}. This requires side input support from the runner. */ private static class PCollectionViewAssert<ElemT, ViewT> implements SingletonAssert<ViewT> { + private final PAssertionSite site; private final PCollection<ElemT> actual; private final PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view; private final AssertionWindows rewindowActuals; @@ -625,18 +692,27 @@ public class PAssert { private final Coder<ViewT> coder; protected PCollectionViewAssert( + PAssertionSite site, PCollection<ElemT> actual, PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view, Coder<ViewT> coder) { - this(actual, view, IntoGlobalWindow.<ElemT>of(), PaneExtractors.<ElemT>onlyPane(), coder); + this( + site, + actual, + view, + IntoGlobalWindow.<ElemT>of(), + PaneExtractors.<ElemT>onlyPane(), + coder); } private PCollectionViewAssert( + PAssertionSite site, PCollection<ElemT> actual, PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view, AssertionWindows rewindowActuals, SimpleFunction<Iterable<ValueInSingleWindow<ElemT>>, Iterable<ElemT>> paneExtractor, Coder<ViewT> coder) { + this.site = site; this.actual = actual; this.view = view; this.rewindowActuals = rewindowActuals; @@ -663,6 +739,7 @@ public class PAssert { BoundedWindow window, SimpleFunction<Iterable<ValueInSingleWindow<ElemT>>, Iterable<ElemT>> paneExtractor) { return new PCollectionViewAssert<>( + site, actual, view, IntoStaticWindows.of( @@ -689,6 +766,7 @@ public class PAssert { .apply( "PAssert$" + (assertCount++), new OneSideInputAssert<ViewT>( + site, CreateActual.from(actual, rewindowActuals, paneExtractor, view), rewindowActuals.<Integer>windowDummy(), checkerFn)); @@ -911,14 +989,17 @@ public class PAssert { */ public static class GroupThenAssert<T> extends PTransform<PCollection<T>, PDone> implements Serializable { + private final PAssertionSite site; private final SerializableFunction<Iterable<T>, Void> checkerFn; private final AssertionWindows rewindowingStrategy; private final SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> paneExtractor; private GroupThenAssert( + PAssertionSite site, SerializableFunction<Iterable<T>, Void> checkerFn, AssertionWindows rewindowingStrategy, SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> paneExtractor) { + this.site = site; this.checkerFn = checkerFn; this.rewindowingStrategy = rewindowingStrategy; this.paneExtractor = paneExtractor; @@ -930,7 +1011,7 @@ public class PAssert { .apply("GroupGlobally", new GroupGlobally<T>(rewindowingStrategy)) .apply("GetPane", MapElements.via(paneExtractor)) .setCoder(IterableCoder.of(input.getCoder())) - .apply("RunChecks", ParDo.of(new GroupedValuesCheckerDoFn<>(checkerFn))); + .apply("RunChecks", ParDo.of(new GroupedValuesCheckerDoFn<>(site, checkerFn))); return PDone.in(input.getPipeline()); } @@ -942,16 +1023,19 @@ public class PAssert { */ public static class GroupThenAssertForSingleton<T> extends PTransform<PCollection<Iterable<T>>, PDone> implements Serializable { + private final PAssertionSite site; private final SerializableFunction<Iterable<T>, Void> checkerFn; private final AssertionWindows rewindowingStrategy; private final SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, Iterable<Iterable<T>>> paneExtractor; private GroupThenAssertForSingleton( + PAssertionSite site, SerializableFunction<Iterable<T>, Void> checkerFn, AssertionWindows rewindowingStrategy, SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, Iterable<Iterable<T>>> paneExtractor) { + this.site = site; this.checkerFn = checkerFn; this.rewindowingStrategy = rewindowingStrategy; this.paneExtractor = paneExtractor; @@ -963,7 +1047,7 @@ public class PAssert { .apply("GroupGlobally", new GroupGlobally<Iterable<T>>(rewindowingStrategy)) .apply("GetPane", MapElements.via(paneExtractor)) .setCoder(IterableCoder.of(input.getCoder())) - .apply("RunChecks", ParDo.of(new SingletonCheckerDoFn<>(checkerFn))); + .apply("RunChecks", ParDo.of(new SingletonCheckerDoFn<>(site, checkerFn))); return PDone.in(input.getPipeline()); } @@ -981,14 +1065,17 @@ public class PAssert { */ public static class OneSideInputAssert<ActualT> extends PTransform<PBegin, PDone> implements Serializable { + private final PAssertionSite site; private final transient PTransform<PBegin, PCollectionView<ActualT>> createActual; private final transient PTransform<PCollection<Integer>, PCollection<Integer>> windowToken; private final SerializableFunction<ActualT, Void> checkerFn; private OneSideInputAssert( + PAssertionSite site, PTransform<PBegin, PCollectionView<ActualT>> createActual, PTransform<PCollection<Integer>, PCollection<Integer>> windowToken, SerializableFunction<ActualT, Void> checkerFn) { + this.site = site; this.createActual = createActual; this.windowToken = windowToken; this.checkerFn = checkerFn; @@ -1003,7 +1090,7 @@ public class PAssert { .apply("WindowToken", windowToken) .apply( "RunChecks", - ParDo.withSideInputs(actual).of(new SideInputCheckerDoFn<>(checkerFn, actual))); + ParDo.withSideInputs(actual).of(new SideInputCheckerDoFn<>(site, checkerFn, actual))); return PDone.in(input.getPipeline()); } @@ -1017,6 +1104,7 @@ public class PAssert { * null values. */ private static class SideInputCheckerDoFn<ActualT> extends DoFn<Integer, Void> { + private final PAssertionSite site; private final SerializableFunction<ActualT, Void> checkerFn; private final Aggregator<Integer, Integer> success = createAggregator(SUCCESS_COUNTER, Sum.ofIntegers()); @@ -1025,7 +1113,10 @@ public class PAssert { private final PCollectionView<ActualT> actual; private SideInputCheckerDoFn( - SerializableFunction<ActualT, Void> checkerFn, PCollectionView<ActualT> actual) { + PAssertionSite site, + SerializableFunction<ActualT, Void> checkerFn, + PCollectionView<ActualT> actual) { + this.site = site; this.checkerFn = checkerFn; this.actual = actual; } @@ -1034,7 +1125,7 @@ public class PAssert { public void processElement(ProcessContext c) { try { ActualT actualContents = c.sideInput(actual); - doChecks(actualContents, checkerFn, success, failure); + doChecks(site, actualContents, checkerFn, success, failure); } catch (Throwable t) { // Suppress exception in streaming if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) { @@ -1052,19 +1143,22 @@ public class PAssert { * <p>The singleton property is presumed, not enforced. */ private static class GroupedValuesCheckerDoFn<ActualT> extends DoFn<ActualT, Void> { + private final PAssertionSite site; private final SerializableFunction<ActualT, Void> checkerFn; private final Aggregator<Integer, Integer> success = createAggregator(SUCCESS_COUNTER, Sum.ofIntegers()); private final Aggregator<Integer, Integer> failure = createAggregator(FAILURE_COUNTER, Sum.ofIntegers()); - private GroupedValuesCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) { + private GroupedValuesCheckerDoFn( + PAssertionSite site, SerializableFunction<ActualT, Void> checkerFn) { + this.site = site; this.checkerFn = checkerFn; } @ProcessElement public void processElement(ProcessContext c) { - doChecks(c.element(), checkerFn, success, failure); + doChecks(site, c.element(), checkerFn, success, failure); } } @@ -1077,24 +1171,28 @@ public class PAssert { * each input element must be a singleton iterable, or this will fail. */ private static class SingletonCheckerDoFn<ActualT> extends DoFn<Iterable<ActualT>, Void> { + private final PAssertionSite site; private final SerializableFunction<ActualT, Void> checkerFn; private final Aggregator<Integer, Integer> success = createAggregator(SUCCESS_COUNTER, Sum.ofIntegers()); private final Aggregator<Integer, Integer> failure = createAggregator(FAILURE_COUNTER, Sum.ofIntegers()); - private SingletonCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) { + private SingletonCheckerDoFn( + PAssertionSite site, SerializableFunction<ActualT, Void> checkerFn) { + this.site = site; this.checkerFn = checkerFn; } @ProcessElement public void processElement(ProcessContext c) { ActualT actualContents = Iterables.getOnlyElement(c.element()); - doChecks(actualContents, checkerFn, success, failure); + doChecks(site, actualContents, checkerFn, success, failure); } } private static <ActualT> void doChecks( + PAssertionSite site, ActualT actualContents, SerializableFunction<ActualT, Void> checkerFn, Aggregator<Integer, Integer> successAggregator, @@ -1103,9 +1201,8 @@ public class PAssert { checkerFn.apply(actualContents); successAggregator.addValue(1); } catch (Throwable t) { - LOG.error("PAssert failed expectations.", t); failureAggregator.addValue(1); - throw t; + throw site.wrap(t); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c62611c7/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java index 1997bbe..e09f54b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import com.fasterxml.jackson.annotation.JsonCreator; +import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import java.io.IOException; import java.io.InputStream; @@ -392,6 +393,49 @@ public class PAssertTest implements Serializable { assertThat(thrown.getMessage(), containsString("Expected: iterable over [] in any order")); } + @Test + @Category(RunnableOnService.class) + public void testAssertionSiteIsCapturedWithMessage() throws Exception { + PCollection<Long> vals = pipeline.apply(CountingInput.upTo(5L)); + assertThatCollectionIsEmptyWithMessage(vals); + + Throwable thrown = runExpectingAssertionFailure(pipeline); + + assertThat( + thrown.getMessage(), + containsString("Should be empty")); + assertThat( + thrown.getMessage(), + containsString("Expected: iterable over [] in any order")); + String stacktrace = Throwables.getStackTraceAsString(thrown); + assertThat(stacktrace, containsString("testAssertionSiteIsCapturedWithMessage")); + assertThat(stacktrace, containsString("assertThatCollectionIsEmptyWithMessage")); + } + + @Test + @Category(RunnableOnService.class) + public void testAssertionSiteIsCapturedWithoutMessage() throws Exception { + PCollection<Long> vals = pipeline.apply(CountingInput.upTo(5L)); + assertThatCollectionIsEmptyWithoutMessage(vals); + + Throwable thrown = runExpectingAssertionFailure(pipeline); + + assertThat( + thrown.getMessage(), + containsString("Expected: iterable over [] in any order")); + String stacktrace = Throwables.getStackTraceAsString(thrown); + assertThat(stacktrace, containsString("testAssertionSiteIsCapturedWithoutMessage")); + assertThat(stacktrace, containsString("assertThatCollectionIsEmptyWithoutMessage")); + } + + private static void assertThatCollectionIsEmptyWithMessage(PCollection<Long> vals) { + PAssert.that("Should be empty", vals).empty(); + } + + private static void assertThatCollectionIsEmptyWithoutMessage(PCollection<Long> vals) { + PAssert.that(vals).empty(); + } + private static Throwable runExpectingAssertionFailure(Pipeline pipeline) { // We cannot use thrown.expect(AssertionError.class) because the AssertionError // is first caught by JUnit and causes a test failure.