Updated Branches: refs/heads/master 11ba4134e -> f6f965c4d
CRUNCH-112: Clean up FilterFn. Move implementation classes to a new FilterFns utility class. Add new ACCEPT_ALL and REJECT_ALL filters. Rewrite test cases to use the new filters. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/f6f965c4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/f6f965c4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/f6f965c4 Branch: refs/heads/master Commit: f6f965c4d9963eacb5584a85b667ed7148bccabb Parents: 11ba413 Author: Matthias Friedrich <[email protected]> Authored: Sun Nov 11 21:13:46 2012 +0100 Committer: Matthias Friedrich <[email protected]> Committed: Sun Nov 11 21:13:46 2012 +0100 ---------------------------------------------------------------------- .../it/java/org/apache/crunch/MRPipelineIT.java | 8 +- .../it/java/org/apache/crunch/MaterializeIT.java | 13 +-- .../org/apache/crunch/PCollectionGetSizeIT.java | 15 +-- .../org/apache/crunch/lib/join/MapsideJoinIT.java | 12 +-- .../src/main/java/org/apache/crunch/FilterFn.java | 21 +++- .../main/java/org/apache/crunch/fn/FilterFns.java | 112 +++++++++++++++ .../test/java/org/apache/crunch/FilterFnTest.java | 63 -------- .../java/org/apache/crunch/fn/FilterFnTest.java | 85 +++++++++++ 8 files changed, 226 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f6f965c4/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java b/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java index 8664550..7670e88 100644 --- a/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java +++ b/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; +import org.apache.crunch.fn.FilterFns; import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.To; @@ -42,12 +43,7 @@ public class MRPipelineIT implements Serializable { Pipeline pipeline = new MRPipeline(MRPipelineIT.class, tmpDir.getDefaultConfiguration()); PCollection<String> genericCollection = pipeline.readTextFile(textFile.getAbsolutePath()); pipeline.run(); - PCollection<String> filter = genericCollection.filter("Filtering data", new FilterFn<String>() { - @Override - public boolean accept(String input) { - return true; - } - }); + PCollection<String> filter = genericCollection.filter("Filtering data", FilterFns.<String>ACCEPT_ALL()); filter.materialize(); pipeline.run(); File file = tmpDir.getFile("output.txt"); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f6f965c4/crunch/src/it/java/org/apache/crunch/MaterializeIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/MaterializeIT.java b/crunch/src/it/java/org/apache/crunch/MaterializeIT.java index 3b4f0e6..d064993 100644 --- a/crunch/src/it/java/org/apache/crunch/MaterializeIT.java +++ b/crunch/src/it/java/org/apache/crunch/MaterializeIT.java @@ -23,6 +23,7 @@ import static junit.framework.Assert.assertTrue; import java.io.IOException; import java.util.List; +import org.apache.crunch.fn.FilterFns; import org.apache.crunch.impl.mem.MemPipeline; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.test.Person; @@ -41,16 +42,6 @@ import com.google.common.collect.Lists; public class MaterializeIT { - /** Filter that rejects everything. */ - @SuppressWarnings("serial") - private static class FalseFilterFn extends FilterFn<String> { - - @Override - public boolean accept(final String input) { - return false; - } - } - @Rule public TemporaryPath tmpDir = TemporaryPaths.create(); @@ -112,7 +103,7 @@ public class MaterializeIT { public void runMaterializeEmptyIntermediate(Pipeline pipeline, PTypeFamily typeFamily) throws IOException { String inputPath = tmpDir.copyResourceFileName("set1.txt"); - PCollection<String> empty = pipeline.readTextFile(inputPath).filter(new FalseFilterFn()); + PCollection<String> empty = pipeline.readTextFile(inputPath).filter(FilterFns.<String>REJECT_ALL()); assertTrue(Lists.newArrayList(empty.materialize()).isEmpty()); pipeline.done(); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f6f965c4/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java b/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java index 8182b30..44eb897 100644 --- a/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java +++ b/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertThat; import java.io.IOException; +import org.apache.crunch.fn.FilterFns; import org.apache.crunch.impl.mem.MemPipeline; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.test.TemporaryPath; @@ -43,16 +44,6 @@ public class PCollectionGetSizeIT { private String nonEmptyInputPath; private String outputPath; - /** Filter that rejects everything. */ - @SuppressWarnings("serial") - private static class FalseFilterFn extends FilterFn<String> { - - @Override - public boolean accept(final String input) { - return false; - } - } - @Before public void setUp() throws IOException { emptyInputPath = tmpDir.copyResourceFileName("emptyTextFile.txt"); @@ -105,7 +96,7 @@ public class PCollectionGetSizeIT { PCollection<String> data = new MRPipeline(this.getClass(), tmpDir.getDefaultConfiguration()) .readTextFile(nonEmptyInputPath); - PCollection<String> emptyPCollection = data.filter(new FalseFilterFn()); + PCollection<String> emptyPCollection = data.filter(FilterFns.<String>REJECT_ALL()); assertThat(emptyPCollection.getSize(), is(0L)); } @@ -139,7 +130,7 @@ public class PCollectionGetSizeIT { PCollection<String> data = pipeline.readTextFile(nonEmptyInputPath); - PCollection<String> emptyPCollection = data.filter(new FalseFilterFn()); + PCollection<String> emptyPCollection = data.filter(FilterFns.<String>REJECT_ALL()); emptyPCollection.write(sequenceFile(outputPath, strings())); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f6f965c4/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java index 9982ba4..297680e 100644 --- a/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java +++ b/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java @@ -24,11 +24,11 @@ import java.io.IOException; import java.util.Collections; import java.util.List; -import org.apache.crunch.FilterFn; import org.apache.crunch.MapFn; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; +import org.apache.crunch.fn.FilterFns; import org.apache.crunch.impl.mem.MemPipeline; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.impl.mr.run.CrunchRuntimeException; @@ -73,14 +73,6 @@ public class MapsideJoinIT { } - private static class NegativeFilter extends FilterFn<Pair<Integer, String>> { - - @Override - public boolean accept(Pair<Integer, String> input) { - return false; - } - - } @Rule public TemporaryPath tmpDir = TemporaryPaths.create(); @@ -96,7 +88,7 @@ public class MapsideJoinIT { PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt"); PTable<Integer, String> filteredOrderTable = orderTable - .parallelDo(new NegativeFilter(), orderTable.getPTableType()); + .parallelDo(FilterFns.<Pair<Integer, String>>REJECT_ALL(), orderTable.getPTableType()); PTable<Integer, Pair<String, String>> joined = MapsideJoin.join(customerTable, filteredOrderTable); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f6f965c4/crunch/src/main/java/org/apache/crunch/FilterFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/FilterFn.java b/crunch/src/main/java/org/apache/crunch/FilterFn.java index a9612ac..467400f 100644 --- a/crunch/src/main/java/org/apache/crunch/FilterFn.java +++ b/crunch/src/main/java/org/apache/crunch/FilterFn.java @@ -19,12 +19,13 @@ package org.apache.crunch; import java.util.List; +import org.apache.crunch.fn.FilterFns; + import com.google.common.collect.ImmutableList; /** * A {@link DoFn} for the common case of filtering the members of a * {@link PCollection} based on a boolean condition. - * */ public abstract class FilterFn<T> extends DoFn<T, T> { @@ -45,10 +46,16 @@ public abstract class FilterFn<T> extends DoFn<T, T> { return 0.5f; } + /** + * @deprecated Use {@link FilterFns#and(FilterFn...)} + */ public static <S> FilterFn<S> and(FilterFn<S>... fns) { return new AndFn<S>(fns); } + /** + * @deprecated Use {@link FilterFns#and(FilterFn...)} + */ public static class AndFn<S> extends FilterFn<S> { private final List<FilterFn<S>> fns; @@ -77,10 +84,16 @@ public abstract class FilterFn<T> extends DoFn<T, T> { } } + /** + * @deprecated Use {@link FilterFns#or(FilterFn...)} + */ public static <S> FilterFn<S> or(FilterFn<S>... fns) { return new OrFn<S>(fns); } + /** + * @deprecated Use {@link FilterFns#or(FilterFn...)} + */ public static class OrFn<S> extends FilterFn<S> { private final List<FilterFn<S>> fns; @@ -109,10 +122,16 @@ public abstract class FilterFn<T> extends DoFn<T, T> { } } + /** + * @deprecated Use {@link FilterFns#not(FilterFn)} + */ public static <S> FilterFn<S> not(FilterFn<S> fn) { return new NotFn<S>(fn); } + /** + * @deprecated Use {@link FilterFns#not(FilterFn)} + */ public static class NotFn<S> extends FilterFn<S> { private final FilterFn<S> base; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f6f965c4/crunch/src/main/java/org/apache/crunch/fn/FilterFns.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/fn/FilterFns.java b/crunch/src/main/java/org/apache/crunch/fn/FilterFns.java new file mode 100644 index 0000000..8dc4268 --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/fn/FilterFns.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.fn; + +import org.apache.crunch.FilterFn; +import org.apache.crunch.FilterFn.AndFn; +import org.apache.crunch.FilterFn.NotFn; +import org.apache.crunch.FilterFn.OrFn; + + +/** + * A collection of pre-defined {@link FilterFn} implementations. + */ +public final class FilterFns { + // Note: We delegate to the deprecated implementation classes in FilterFn. When their + // time is up, we just move them here. + + private FilterFns() { + // utility class, not for instantiation + } + + /** + * Accept an entry if all of the given filters accept it, using short-circuit evaluation. + * @param fn1 The first functions to delegate to + * @param fn2 The second functions to delegate to + * @return The composed filter function + */ + public static <S> FilterFn<S> and(FilterFn<S> fn1, FilterFn<S> fn2) { + return new AndFn<S>(fn1, fn2); + } + + /** + * Accept an entry if all of the given filters accept it, using short-circuit evaluation. + * @param fns The functions to delegate to (in the given order) + * @return The composed filter function + */ + public static <S> FilterFn<S> and(FilterFn<S>... fns) { + return new AndFn<S>(fns); + } + + /** + * Accept an entry if at least one of the given filters accept it, using short-circuit evaluation. + * @param fn1 The first functions to delegate to + * @param fn2 The second functions to delegate to + * @return The composed filter function + */ + public static <S> FilterFn<S> or(FilterFn<S> fn1, FilterFn<S> fn2) { + return new OrFn<S>(fn1, fn2); + } + + /** + * Accept an entry if at least one of the given filters accept it, using short-circuit evaluation. + * @param fns The functions to delegate to (in the given order) + * @return The composed filter function + */ + public static <S> FilterFn<S> or(FilterFn<S>... fns) { + return new OrFn<S>(fns); + } + + /** + * Accept an entry if the given filter <em>does not</em> accept it. + * @param fn The function to delegate to + * @return The composed filter function + */ + public static <S> FilterFn<S> not(FilterFn<S> fn) { + return new NotFn<S>(fn); + } + + /** + * Accept everything. + * @return A filter function that accepts everything. + */ + public static <S> FilterFn<S> ACCEPT_ALL() { + return new AcceptAllFn<S>(); + } + + /** + * Reject everything. + * @return A filter function that rejects everything. + */ + public static <S> FilterFn<S> REJECT_ALL() { + return not(new AcceptAllFn<S>()); + } + + private static class AcceptAllFn<S> extends FilterFn<S> { + @Override + public boolean accept(S input) { + return true; + } + + @Override + public float scaleFactor() { + return 1.0f; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f6f965c4/crunch/src/test/java/org/apache/crunch/FilterFnTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/FilterFnTest.java b/crunch/src/test/java/org/apache/crunch/FilterFnTest.java deleted file mode 100644 index 9de086d..0000000 --- a/crunch/src/test/java/org/apache/crunch/FilterFnTest.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import org.junit.Test; - -@SuppressWarnings("serial") -public class FilterFnTest { - - private static final FilterFn<String> TRUE = new FilterFn<String>() { - @Override - public boolean accept(String input) { - return true; - } - }; - - private static final FilterFn<String> FALSE = new FilterFn<String>() { - @Override - public boolean accept(String input) { - return false; - } - }; - - @Test - public void testAnd() { - assertTrue(FilterFn.and(TRUE).accept("foo")); - assertTrue(FilterFn.and(TRUE, TRUE).accept("foo")); - assertFalse(FilterFn.and(TRUE, FALSE).accept("foo")); - assertFalse(FilterFn.and(FALSE, FALSE, FALSE).accept("foo")); - } - - @Test - public void testOr() { - assertFalse(FilterFn.or(FALSE).accept("foo")); - assertTrue(FilterFn.or(FALSE, TRUE).accept("foo")); - assertTrue(FilterFn.or(TRUE, FALSE, TRUE).accept("foo")); - assertFalse(FilterFn.or(FALSE, FALSE, FALSE).accept("foo")); - } - - @Test - public void testNot() { - assertFalse(FilterFn.not(TRUE).accept("foo")); - assertTrue(FilterFn.not(FALSE).accept("foo")); - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f6f965c4/crunch/src/test/java/org/apache/crunch/fn/FilterFnTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/fn/FilterFnTest.java b/crunch/src/test/java/org/apache/crunch/fn/FilterFnTest.java new file mode 100644 index 0000000..a649f99 --- /dev/null +++ b/crunch/src/test/java/org/apache/crunch/fn/FilterFnTest.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.fn; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import org.apache.crunch.FilterFn; +import org.junit.Test; + +import com.google.common.base.Predicates; + + +public class FilterFnTest { + + private static final FilterFn<String> TRUE = FilterFns.<String>ACCEPT_ALL(); + private static final FilterFn<String> FALSE = FilterFns.<String>REJECT_ALL(); + + @Test + public void testAcceptAll() { + assertThat(TRUE.accept(""), is(true)); + assertThat(TRUE.accept("foo"), is(true)); + } + + @Test + public void testRejectAll() { + assertThat(FALSE.accept(""), is(false)); + assertThat(FALSE.accept("foo"), is(false)); + + Predicates.or(Predicates.alwaysFalse(), Predicates.alwaysTrue()); + } + + @Test + public void testAnd() { + assertThat(FilterFns.and(TRUE, TRUE).accept("foo"), is(true)); + assertThat(FilterFns.and(TRUE, FALSE).accept("foo"), is(false)); + } + + @Test + @SuppressWarnings("unchecked") + public void testGeneric() { + assertThat(FilterFns.and(TRUE).accept("foo"), is(true)); + assertThat(FilterFns.and(FALSE).accept("foo"), is(false)); + assertThat(FilterFns.and(FALSE, FALSE, FALSE).accept("foo"), is(false)); + assertThat(FilterFns.and(TRUE, TRUE, FALSE).accept("foo"), is(false)); + assertThat(FilterFns.and(FALSE, FALSE, FALSE, FALSE).accept("foo"), is(false)); + } + + @Test + public void testOr() { + assertThat(FilterFns.or(FALSE, TRUE).accept("foo"), is(true)); + assertThat(FilterFns.or(TRUE, FALSE).accept("foo"), is(true)); + } + + @Test + @SuppressWarnings("unchecked") + public void testOrGeneric() { + assertThat(FilterFns.or(TRUE).accept("foo"), is(true)); + assertThat(FilterFns.or(FALSE).accept("foo"), is(false)); + assertThat(FilterFns.or(TRUE, FALSE, TRUE).accept("foo"), is(true)); + assertThat(FilterFns.or(FALSE, FALSE, TRUE).accept("foo"), is(true)); + assertThat(FilterFns.or(FALSE, FALSE, FALSE).accept("foo"), is(false)); + } + + @Test + public void testNot() { + assertThat(FilterFns.not(TRUE).accept("foo"), is(false)); + assertThat(FilterFns.not(FALSE).accept("foo"), is(true)); + } +}
