[BEAM-342] Implement Filter#greaterThan,etc with Filter#byPredicate
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3d87f8b9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3d87f8b9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3d87f8b9 Branch: refs/heads/python-sdk Commit: 3d87f8b987e243c6b3d99ab67142301af7b65743 Parents: 6491100 Author: manuzhang <owenzhang1...@gmail.com> Authored: Wed Jun 15 16:02:35 2016 +0800 Committer: Davor Bonaci <da...@google.com> Committed: Mon Jun 20 15:14:30 2016 -0700 ---------------------------------------------------------------------- .../beam/examples/complete/AutoComplete.java | 2 +- .../examples/complete/AutoCompleteTest.java | 14 +- .../beam/examples/MinimalWordCountJava8.java | 2 +- .../examples/complete/game/HourlyTeamScore.java | 6 +- .../examples/MinimalWordCountJava8Test.java | 2 +- .../complete/game/HourlyTeamScoreTest.java | 2 +- .../flink/examples/streaming/AutoComplete.java | 12 +- .../org/apache/beam/sdk/transforms/Filter.java | 128 +++++++------------ .../apache/beam/sdk/transforms/FilterTest.java | 63 +++------ .../beam/sdk/transforms/FilterJava8Test.java | 8 +- 10 files changed, 89 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java index d725e0a..3e4440c 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java @@ -235,7 +235,7 @@ public class AutoComplete { .of(larger.get(1).apply(ParDo.of(new FlattenTops()))) // ...together with those (previously excluded) candidates of length // exactly minPrefix... - .and(input.apply(Filter.byPredicate( + .and(input.apply(Filter.by( new SerializableFunction<CompletionCandidate, Boolean>() { @Override public Boolean apply(CompletionCandidate c) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java index 93dd0be..b2ed9a2 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java +++ b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java @@ -85,13 +85,13 @@ public class AutoCompleteTest implements Serializable { PCollection<KV<String, List<CompletionCandidate>>> output = input.apply(new ComputeTopCompletions(2, recursive)) - .apply(Filter.byPredicate( - new SerializableFunction<KV<String, List<CompletionCandidate>>, Boolean>() { - @Override - public Boolean apply(KV<String, List<CompletionCandidate>> element) { - return element.getKey().length() <= 2; - } - })); + .apply(Filter.by( + new SerializableFunction<KV<String, List<CompletionCandidate>>, Boolean>() { + @Override + public Boolean apply(KV<String, List<CompletionCandidate>> element) { + return element.getKey().length() <= 2; + } + })); PAssert.that(output).containsInAnyOrder( KV.of("a", parseList("apple:2", "apricot:1")), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java index d491741..0ad1a04 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java @@ -55,7 +55,7 @@ public class MinimalWordCountJava8 { p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))) .withOutputType(TypeDescriptors.strings())) - .apply(Filter.byPredicate((String word) -> !word.isEmpty())) + .apply(Filter.by((String word) -> !word.isEmpty())) .apply(Count.<String>perElement()) .apply(MapElements .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java index 845c56f..ba3983d 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java @@ -44,7 +44,7 @@ import java.util.TimeZone; /** * This class is the second in a series of four pipelines that tell a story in a 'gaming' * domain, following {@link UserScore}. In addition to the concepts introduced in {@link UserScore}, - * new concepts include: windowing and element timestamps; use of {@code Filter.byPredicate()}. + * new concepts include: windowing and element timestamps; use of {@code Filter.by()}. * * <p> This pipeline processes data collected from gaming events in batch, building on {@link * UserScore} but using fixed windows. It calculates the sum of scores per team, for each window, @@ -164,10 +164,10 @@ public class HourlyTeamScore extends UserScore { // (to scoop up late-arriving events from the day we're analyzing), we need to weed out events // that fall after the time period we want to analyze. // [START DocInclude_HTSFilters] - .apply("FilterStartTime", Filter.byPredicate( + .apply("FilterStartTime", Filter.by( (GameActionInfo gInfo) -> gInfo.getTimestamp() > startMinTimestamp.getMillis())) - .apply("FilterEndTime", Filter.byPredicate( + .apply("FilterEndTime", Filter.by( (GameActionInfo gInfo) -> gInfo.getTimestamp() < stopMinTimestamp.getMillis())) // [END DocInclude_HTSFilters] http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java index f73250f..4dfa474 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java @@ -66,7 +66,7 @@ public class MinimalWordCountJava8Test implements Serializable { p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))) .withOutputType(TypeDescriptors.strings())) - .apply(Filter.byPredicate((String word) -> !word.isEmpty())) + .apply(Filter.by((String word) -> !word.isEmpty())) .apply(Count.<String>perElement()) .apply(MapElements .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java index 5ff615a..4254902 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java @@ -96,7 +96,7 @@ public class HourlyTeamScoreTest implements Serializable { PCollection<KV<String, Integer>> output = input .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn())) - .apply("FilterStartTime", Filter.byPredicate( + .apply("FilterStartTime", Filter.by( (GameActionInfo gInfo) -> gInfo.getTimestamp() > startMinTimestamp.getMillis())) // run a map to access the fields in the result. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java index 9d1168b..d83e662 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java @@ -215,13 +215,13 @@ public class AutoComplete { // ...together with those (previously excluded) candidates of length // exactly minPrefix... .and(input.apply(Filter.by(new SerializableFunction<CompletionCandidate, Boolean>() { - private static final long serialVersionUID = 0; + private static final long serialVersionUID = 0; - @Override - public Boolean apply(CompletionCandidate c) { - return c.getValue().length() == minPrefix; - } - }))) + @Override + public Boolean apply(CompletionCandidate c) { + return c.getValue().length() == minPrefix; + } + }))) .apply("FlattenSmall", Flatten.<CompletionCandidate>pCollections()) // ...set the key to be the minPrefix-length prefix... .apply(ParDo.of(new AllPrefixes(minPrefix, minPrefix))) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java index 57796b8..a31799e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java @@ -41,7 +41,7 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> { * <pre> {@code * PCollection<String> wordList = ...; * PCollection<String> longWords = - * wordList.apply(Filter.byPredicate(new MatchIfWordLengthGT(6))); + * wordList.apply(Filter.by(new MatchIfWordLengthGT(6))); * } </pre> * * <p>See also {@link #lessThan}, {@link #lessThanEq}, @@ -50,25 +50,8 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> { * the elements' natural ordering. */ public static <T, PredicateT extends SerializableFunction<T, Boolean>> Filter<T> - byPredicate(PredicateT predicate) { - return new Filter<T>("Filter", predicate); - } - - /** - * @deprecated use {@link #byPredicate}, which returns a {@link Filter} transform instead of - * a {@link ParDo.Bound}. - */ - @Deprecated - public static <T, PredicateT extends SerializableFunction<T, Boolean>> ParDo.Bound<T, T> - by(final PredicateT filterPred) { - return ParDo.named("Filter").of(new DoFn<T, T>() { - @Override - public void processElement(ProcessContext c) { - if (filterPred.apply(c.element()) == true) { - c.output(c.element()); - } - } - }); + by(PredicateT predicate) { + return new Filter<>(predicate); } /** @@ -89,24 +72,16 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> { * inequalities with the specified value based on the elements' * natural ordering. * - * <p>See also {@link #byPredicate}, which returns elements + * <p>See also {@link #by}, which returns elements * that satisfy the given predicate. */ - public static <T extends Comparable<T>> ParDo.Bound<T, T> lessThan(final T value) { - return ParDo.named("Filter.lessThan").of(new DoFn<T, T>() { + public static <T extends Comparable<T>> Filter<T> lessThan(final T value) { + return by(new SerializableFunction<T, Boolean>() { @Override - public void processElement(ProcessContext c) { - if (c.element().compareTo(value) < 0) { - c.output(c.element()); - } - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - Filter.populateDisplayData(builder, String.format("x < %s", value)); + public Boolean apply(T input) { + return input.compareTo(value) < 0; } - }); + }).described(String.format("x < %s", value)); } @@ -128,24 +103,16 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> { * inequalities with the specified value based on the elements' * natural ordering. * - * <p>See also {@link #byPredicate}, which returns elements + * <p>See also {@link #by}, which returns elements * that satisfy the given predicate. */ - public static <T extends Comparable<T>> ParDo.Bound<T, T> greaterThan(final T value) { - return ParDo.named("Filter.greaterThan").of(new DoFn<T, T>() { - @Override - public void processElement(ProcessContext c) { - if (c.element().compareTo(value) > 0) { - c.output(c.element()); - } - } - + public static <T extends Comparable<T>> Filter<T> greaterThan(final T value) { + return by(new SerializableFunction<T, Boolean>() { @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - Filter.populateDisplayData(builder, String.format("x > %s", value)); + public Boolean apply(T input) { + return input.compareTo(value) > 0; } - }); + }).described(String.format("x > %s", value)); } /** @@ -166,24 +133,16 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> { * inequalities with the specified value based on the elements' * natural ordering. * - * <p>See also {@link #byPredicate}, which returns elements + * <p>See also {@link #by}, which returns elements * that satisfy the given predicate. */ - public static <T extends Comparable<T>> ParDo.Bound<T, T> lessThanEq(final T value) { - return ParDo.named("Filter.lessThanEq").of(new DoFn<T, T>() { - @Override - public void processElement(ProcessContext c) { - if (c.element().compareTo(value) <= 0) { - c.output(c.element()); - } - } - + public static <T extends Comparable<T>> Filter<T> lessThanEq(final T value) { + return by(new SerializableFunction<T, Boolean>() { @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - Filter.populateDisplayData(builder, String.format("x ⤠%s", value)); + public Boolean apply(T input) { + return input.compareTo(value) <= 0; } - }); + }).described(String.format("x ⤠%s", value)); } /** @@ -204,46 +163,46 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> { * inequalities with the specified value based on the elements' * natural ordering. * - * <p>See also {@link #byPredicate}, which returns elements + * <p>See also {@link #by}, which returns elements * that satisfy the given predicate. */ - public static <T extends Comparable<T>> ParDo.Bound<T, T> greaterThanEq(final T value) { - return ParDo.named("Filter.greaterThanEq").of(new DoFn<T, T>() { + public static <T extends Comparable<T>> Filter<T> greaterThanEq(final T value) { + return by(new SerializableFunction<T, Boolean>() { @Override - public void processElement(ProcessContext c) { - if (c.element().compareTo(value) >= 0) { - c.output(c.element()); - } + public Boolean apply(T input) { + return input.compareTo(value) >= 0; } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - Filter.populateDisplayData(builder, String.format("x ⥠%s", value)); - } - }); + }).described(String.format("x ⥠%s", value)); } /////////////////////////////////////////////////////////////////////////////// private SerializableFunction<T, Boolean> predicate; + private String predicateDescription; private Filter(SerializableFunction<T, Boolean> predicate) { - this.predicate = predicate; + this(predicate, "Filter.predicate"); } - private Filter(String name, SerializableFunction<T, Boolean> predicate) { - super(name); + private Filter(SerializableFunction<T, Boolean> predicate, + String predicateDescription) { this.predicate = predicate; + this.predicateDescription = predicateDescription; } - public Filter<T> named(String name) { - return new Filter<>(name, predicate); + /** + * Returns a new {@link Filter} {@link PTransform} that's like this + * {@link PTransform} but with the specified description for {@link DisplayData}. Does not + * modify this {@link PTransform}. + */ + Filter<T> described(String description) { + return new Filter<>(predicate, description); + } @Override public PCollection<T> apply(PCollection<T> input) { - PCollection<T> output = input.apply(ParDo.named("Filter").of(new DoFn<T, T>() { + PCollection<T> output = input.apply(ParDo.of(new DoFn<T, T>() { @Override public void processElement(ProcessContext c) { if (predicate.apply(c.element()) == true) { @@ -259,8 +218,9 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> { return input.getCoder(); } - private static void populateDisplayData( - DisplayData.Builder builder, String predicateDescription) { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); builder.add(DisplayData.item("predicate", predicateDescription) .withLabel("Filter Predicate")); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java index 367bbc0..2edab05 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java @@ -21,7 +21,6 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp import static org.hamcrest.MatcherAssert.assertThat; -import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; @@ -61,10 +60,9 @@ public class FilterTest implements Serializable { } } - @Deprecated @Test @Category(RunnableOnService.class) - public void testIdentityFilterBy() { + public void testIdentityFilterByPredicate() { TestPipeline p = TestPipeline.create(); PCollection<Integer> output = p @@ -75,10 +73,9 @@ public class FilterTest implements Serializable { p.run(); } - @Deprecated @Test - @Category(NeedsRunner.class) - public void testNoFilter() { + @Category(RunnableOnService.class) + public void testNoFilterByPredicate() { TestPipeline p = TestPipeline.create(); PCollection<Integer> output = p @@ -89,10 +86,9 @@ public class FilterTest implements Serializable { p.run(); } - @Deprecated @Test @Category(RunnableOnService.class) - public void testFilterBy() { + public void testFilterByPredicate() { TestPipeline p = TestPipeline.create(); PCollection<Integer> output = p @@ -105,81 +101,64 @@ public class FilterTest implements Serializable { @Test @Category(RunnableOnService.class) - public void testIdentityFilterByPredicate() { - TestPipeline p = TestPipeline.create(); - - PCollection<Integer> output = p - .apply(Create.of(591, 11789, 1257, 24578, 24799, 307)) - .apply(Filter.byPredicate(new TrivialFn(true))); - - PAssert.that(output).containsInAnyOrder(591, 11789, 1257, 24578, 24799, 307); - p.run(); - } - - @Test - @Category(RunnableOnService.class) - public void testNoFilterByPredicate() { + public void testFilterLessThan() { TestPipeline p = TestPipeline.create(); PCollection<Integer> output = p - .apply(Create.of(1, 2, 4, 5)) - .apply(Filter.byPredicate(new TrivialFn(false))); + .apply(Create.of(1, 2, 3, 4, 5, 6, 7)) + .apply(Filter.lessThan(4)); - PAssert.that(output).empty(); + PAssert.that(output).containsInAnyOrder(1, 2, 3); p.run(); } @Test @Category(RunnableOnService.class) - public void testFilterByPredicate() { + public void testFilterGreaterThan() { TestPipeline p = TestPipeline.create(); PCollection<Integer> output = p .apply(Create.of(1, 2, 3, 4, 5, 6, 7)) - .apply(Filter.byPredicate(new EvenFn())); + .apply(Filter.greaterThan(4)); - PAssert.that(output).containsInAnyOrder(2, 4, 6); + PAssert.that(output).containsInAnyOrder(5, 6, 7); p.run(); } @Test @Category(RunnableOnService.class) - public void testFilterLessThan() { + public void testFilterLessThanEq() { TestPipeline p = TestPipeline.create(); PCollection<Integer> output = p .apply(Create.of(1, 2, 3, 4, 5, 6, 7)) - .apply(Filter.lessThan(4)); + .apply(Filter.lessThanEq(4)); - PAssert.that(output).containsInAnyOrder(1, 2, 3); + PAssert.that(output).containsInAnyOrder(1, 2, 3, 4); p.run(); } @Test @Category(RunnableOnService.class) - public void testFilterGreaterThan() { + public void testFilterGreaterThanEq() { TestPipeline p = TestPipeline.create(); PCollection<Integer> output = p .apply(Create.of(1, 2, 3, 4, 5, 6, 7)) - .apply(Filter.greaterThan(4)); + .apply(Filter.greaterThanEq(4)); - PAssert.that(output).containsInAnyOrder(5, 6, 7); + PAssert.that(output).containsInAnyOrder(4, 5, 6, 7); p.run(); } @Test public void testDisplayData() { - ParDo.Bound<Integer, Integer> lessThan = Filter.lessThan(123); - assertThat(DisplayData.from(lessThan), hasDisplayItem("predicate", "x < 123")); + assertThat(DisplayData.from(Filter.lessThan(123)), hasDisplayItem("predicate", "x < 123")); - ParDo.Bound<Integer, Integer> lessThanOrEqual = Filter.lessThanEq(234); - assertThat(DisplayData.from(lessThanOrEqual), hasDisplayItem("predicate", "x ⤠234")); + assertThat(DisplayData.from(Filter.lessThanEq(234)), hasDisplayItem("predicate", "x ⤠234")); - ParDo.Bound<Integer, Integer> greaterThan = Filter.greaterThan(345); - assertThat(DisplayData.from(greaterThan), hasDisplayItem("predicate", "x > 345")); + assertThat(DisplayData.from(Filter.greaterThan(345)), hasDisplayItem("predicate", "x > 345")); - ParDo.Bound<Integer, Integer> greaterThanOrEqual = Filter.greaterThanEq(456); - assertThat(DisplayData.from(greaterThanOrEqual), hasDisplayItem("predicate", "x ⥠456")); + assertThat(DisplayData.from(Filter.greaterThanEq(456)), hasDisplayItem("predicate", "x ⥠456")); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d87f8b9/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java index 170071b..3c83be2 100644 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java +++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java @@ -50,7 +50,7 @@ public class FilterJava8Test implements Serializable { PCollection<Integer> output = pipeline .apply(Create.of(591, 11789, 1257, 24578, 24799, 307)) - .apply(Filter.byPredicate(i -> true)); + .apply(Filter.by(i -> true)); PAssert.that(output).containsInAnyOrder(591, 11789, 1257, 24578, 24799, 307); pipeline.run(); @@ -62,7 +62,7 @@ public class FilterJava8Test implements Serializable { PCollection<Integer> output = pipeline .apply(Create.of(1, 2, 4, 5)) - .apply(Filter.byPredicate(i -> false)); + .apply(Filter.by(i -> false)); PAssert.that(output).empty(); pipeline.run(); @@ -75,7 +75,7 @@ public class FilterJava8Test implements Serializable { PCollection<Integer> output = pipeline .apply(Create.of(1, 2, 3, 4, 5, 6, 7)) - .apply(Filter.byPredicate(i -> i % 2 == 0)); + .apply(Filter.by(i -> i % 2 == 0)); PAssert.that(output).containsInAnyOrder(2, 4, 6); pipeline.run(); @@ -105,7 +105,7 @@ public class FilterJava8Test implements Serializable { PCollection<Integer> output = pipeline .apply(Create.of(1, 2, 3, 4, 5, 6, 7)) - .apply(Filter.byPredicate(new EvenFilter()::isEven)); + .apply(Filter.by(new EvenFilter()::isEven)); PAssert.that(output).containsInAnyOrder(2, 4, 6); pipeline.run();