Updated Branches: refs/heads/master 29de385be -> dd3ae01ae
CRUNCH-135: Remove sample and sort from PCollection interface Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/dd3ae01a Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/dd3ae01a Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/dd3ae01a Branch: refs/heads/master Commit: dd3ae01ae88707dc5490a2e8294fb6c31ef11e25 Parents: 29de385 Author: Josh Wills <[email protected]> Authored: Tue Dec 18 11:14:59 2012 -0800 Committer: Josh Wills <[email protected]> Committed: Tue Dec 18 11:45:05 2012 -0800 ---------------------------------------------------------------------- .../org/apache/crunch/scrunch/PCollection.scala | 6 ++-- .../src/it/java/org/apache/crunch/lib/SortIT.java | 8 ++++-- .../main/java/org/apache/crunch/PCollection.java | 18 --------------- crunch/src/main/java/org/apache/crunch/PTable.java | 17 ++++++++++++++ .../crunch/impl/mem/collect/MemCollection.java | 17 -------------- .../apache/crunch/impl/mem/collect/MemTable.java | 11 +++++++++ .../crunch/impl/mr/collect/PCollectionImpl.java | 17 -------------- .../apache/crunch/impl/mr/collect/PTableBase.java | 11 +++++++++ .../java/org/apache/crunch/lib/SampleTest.java | 5 ++- 9 files changed, 50 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dd3ae01a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala index ac2242f..49ee6c0 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConversions import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn} import org.apache.crunch.{PCollection => JCollection, Pair => CPair} -import org.apache.crunch.lib.{Aggregate, Cartesian} +import org.apache.crunch.lib.{Aggregate, Cartesian, Sample} import org.apache.crunch.scrunch.Conversions._ import org.apache.crunch.scrunch.interpreter.InterpreterRunner @@ -77,11 +77,11 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCol def min()(implicit converter: Converter[S, S]) = PObject(Aggregate.min(native))(converter) def sample(acceptanceProbability: Double) = { - wrap(native.sample(acceptanceProbability)) + wrap(Sample.sample(native, acceptanceProbability)) } def sample(acceptanceProbability: Double, seed: Long) = { - wrap(native.sample(acceptanceProbability, seed)) + wrap(Sample.sample(native, seed, acceptanceProbability)) } def pType = native.getPType() http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dd3ae01a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java b/crunch/src/it/java/org/apache/crunch/lib/SortIT.java index 4a22a51..3ea31ca 100644 --- a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java +++ b/crunch/src/it/java/org/apache/crunch/lib/SortIT.java @@ -151,15 +151,17 @@ public class SortIT implements Serializable { public void testAvroReflectSortPair() throws IOException { Pipeline pipeline = new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()); pipeline.enableDebug(); - PCollection<Pair<String, StringWrapper>> sorted = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt")) + String rsrc = tmpDir.copyResourceFileName("set2.txt"); + PCollection<Pair<String, StringWrapper>> in = pipeline.readTextFile(rsrc) .parallelDo(new MapFn<String, Pair<String, StringWrapper>>() { @Override public Pair<String, StringWrapper> map(String input) { return Pair.of(input, wrap(input)); } - }, Avros.pairs(Avros.strings(), Avros.reflects(StringWrapper.class))).sort(true); - + }, Avros.pairs(Avros.strings(), Avros.reflects(StringWrapper.class))); + PCollection<Pair<String, StringWrapper>> sorted = Sort.sort(in, Order.ASCENDING); + List<Pair<String, StringWrapper>> expected = Lists.newArrayList(); expected.add(Pair.of("a", wrap("a"))); expected.add(Pair.of("c", wrap("c"))); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dd3ae01a/crunch/src/main/java/org/apache/crunch/PCollection.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/PCollection.java b/crunch/src/main/java/org/apache/crunch/PCollection.java index f5a3465..00c300f 100644 --- a/crunch/src/main/java/org/apache/crunch/PCollection.java +++ b/crunch/src/main/java/org/apache/crunch/PCollection.java @@ -176,12 +176,6 @@ public interface PCollection<S> { <K> PTable<K, S> by(String name, MapFn<S, K> extractKeyFn, PType<K> keyType); /** - * Returns a {@code PCollection} instance that contains all of the elements of - * this instance in sorted order. - */ - PCollection<S> sort(boolean ascending); - - /** * Returns a {@code PTable} instance that contains the counts of each unique * element of this PCollection. */ @@ -196,16 +190,4 @@ public interface PCollection<S> { * Returns a {@code PObject} of the minimum element of this instance. */ PObject<S> min(); - - /** - * Randomly sample items from this PCollection instance with the given - * probability of an item being accepted. - */ - PCollection<S> sample(double acceptanceProbability); - - /** - * Randomly sample items from this PCollection instance with the given - * probability of an item being accepted and using the given seed. - */ - PCollection<S> sample(double acceptanceProbability, long seed); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dd3ae01a/crunch/src/main/java/org/apache/crunch/PTable.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/PTable.java b/crunch/src/main/java/org/apache/crunch/PTable.java index e827603..b754a2c 100644 --- a/crunch/src/main/java/org/apache/crunch/PTable.java +++ b/crunch/src/main/java/org/apache/crunch/PTable.java @@ -89,6 +89,23 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> { PTable<K, Collection<V>> collectValues(); /** + * Apply the given filter function to this instance and return the resulting + * {@code PTable}. + */ + PTable<K, V> filter(FilterFn<Pair<K, V>> filterFn); + + /** + * Apply the given filter function to this instance and return the resulting + * {@code PTable}. + * + * @param name + * An identifier for this processing step + * @param filterFn + * The {@code FilterFn} to apply + */ + PTable<K, V> filter(String name, FilterFn<Pair<K, V>> filterFn); + + /** * Returns a PTable made up of the pairs in this PTable with the largest value * field. * http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dd3ae01a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java index 61bb1e7..35f64ce 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java @@ -37,8 +37,6 @@ import org.apache.crunch.fn.ExtractKeyFn; import org.apache.crunch.impl.mem.MemPipeline; import org.apache.crunch.impl.mem.emit.InMemoryEmitter; import org.apache.crunch.lib.Aggregate; -import org.apache.crunch.lib.Sample; -import org.apache.crunch.lib.Sort; import org.apache.crunch.materialize.pobject.CollectionPObject; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; @@ -183,16 +181,6 @@ public class MemCollection<S> implements PCollection<S> { } @Override - public PCollection<S> sample(double acceptanceProbability) { - return Sample.sample(this, acceptanceProbability); - } - - @Override - public PCollection<S> sample(double acceptanceProbability, long seed) { - return Sample.sample(this, seed, acceptanceProbability); - } - - @Override public PObject<S> max() { return Aggregate.max(this); } @@ -203,11 +191,6 @@ public class MemCollection<S> implements PCollection<S> { } @Override - public PCollection<S> sort(boolean ascending) { - return Sort.sort(this, ascending ? Sort.Order.ASCENDING : Sort.Order.DESCENDING); - } - - @Override public PCollection<S> filter(FilterFn<S> filterFn) { return parallelDo(filterFn, getPType()); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dd3ae01a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java index 56dc69a..524d492 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import org.apache.crunch.FilterFn; import org.apache.crunch.GroupingOptions; import org.apache.crunch.PCollection; import org.apache.crunch.PGroupedTable; @@ -107,6 +108,16 @@ public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable< } @Override + public PTable<K, V> filter(FilterFn<Pair<K, V>> filterFn) { + return parallelDo(filterFn, getPTableType()); + } + + @Override + public PTable<K, V> filter(String name, FilterFn<Pair<K, V>> filterFn) { + return parallelDo(name, filterFn, getPTableType()); + } + + @Override public PTable<K, V> top(int count) { return Aggregate.top(this, count, true); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dd3ae01a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java index f0d8187..79fe72b 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java @@ -37,8 +37,6 @@ import org.apache.crunch.fn.ExtractKeyFn; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.impl.mr.plan.DoNode; import org.apache.crunch.lib.Aggregate; -import org.apache.crunch.lib.Sample; -import org.apache.crunch.lib.Sort; import org.apache.crunch.materialize.pobject.CollectionPObject; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; @@ -153,11 +151,6 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { } @Override - public PCollection<S> sort(boolean ascending) { - return Sort.sort(this, ascending ? Sort.Order.ASCENDING : Sort.Order.DESCENDING); - } - - @Override public PTable<S, Long> count() { return Aggregate.count(this); } @@ -178,16 +171,6 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { } @Override - public PCollection<S> sample(double acceptanceProbability) { - return Sample.sample(this, acceptanceProbability); - } - - @Override - public PCollection<S> sample(double acceptanceProbability, long seed) { - return Sample.sample(this, seed, acceptanceProbability); - } - - @Override public PTypeFamily getTypeFamily() { return getPType().getFamily(); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dd3ae01a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java index 9183784..03c2fdc 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import org.apache.crunch.FilterFn; import org.apache.crunch.GroupingOptions; import org.apache.crunch.PCollection; import org.apache.crunch.PObject; @@ -80,6 +81,16 @@ abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> implements P } @Override + public PTable<K, V> filter(FilterFn<Pair<K, V>> filterFn) { + return parallelDo(filterFn, getPTableType()); + } + + @Override + public PTable<K, V> filter(String name, FilterFn<Pair<K, V>> filterFn) { + return parallelDo(name, filterFn, getPTableType()); + } + + @Override public PTable<K, V> top(int count) { return Aggregate.top(this, count, true); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dd3ae01a/crunch/src/test/java/org/apache/crunch/lib/SampleTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/lib/SampleTest.java b/crunch/src/test/java/org/apache/crunch/lib/SampleTest.java index 0f75fb6..69fd074 100644 --- a/crunch/src/test/java/org/apache/crunch/lib/SampleTest.java +++ b/crunch/src/test/java/org/apache/crunch/lib/SampleTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import java.util.List; +import org.apache.crunch.PCollection; import org.apache.crunch.impl.mem.MemPipeline; import org.junit.Test; @@ -29,8 +30,8 @@ import com.google.common.collect.ImmutableList; public class SampleTest { @Test public void testSampler() { - Iterable<Integer> sample = MemPipeline.collectionOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).sample(0.2, 123998) - .materialize(); + PCollection<Integer> pcollect = MemPipeline.collectionOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + Iterable<Integer> sample = Sample.sample(pcollect, 123998, 0.2).materialize(); List<Integer> sampleValues = ImmutableList.copyOf(sample); assertEquals(ImmutableList.of(6, 7), sampleValues); }
