CRUNCH-57. PCollections should have a length function that yields the number of elements in the PCollection.
Signed-off-by: Josh Wills <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/8cea3d02 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/8cea3d02 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/8cea3d02 Branch: refs/heads/master Commit: 8cea3d02e06a7038662fd57f4dd7f440d4066276 Parents: d49c07a Author: Kiyan Ahmadizadeh <[email protected]> Authored: Tue Sep 4 20:38:23 2012 -0700 Committer: Josh Wills <[email protected]> Committed: Fri Sep 14 03:53:07 2012 -0700 ---------------------------------------------------------------------- .../org/apache/crunch/scrunch/LengthTest.scala | 39 ++++++++ .../org/apache/crunch/scrunch/PCollection.scala | 2 +- .../apache/crunch/scrunch/PCollectionLike.scala | 15 +++- .../scala/org/apache/crunch/scrunch/PObject.scala | 5 +- .../org/apache/crunch/CollectionsLengthIT.java | 76 +++++++++++++++ .../main/java/org/apache/crunch/PCollection.java | 7 ++ .../crunch/impl/mem/collect/MemCollection.java | 5 + .../crunch/impl/mr/collect/PCollectionImpl.java | 5 + .../main/java/org/apache/crunch/lib/Aggregate.java | 20 ++++ 9 files changed, 168 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/LengthTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/LengthTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/LengthTest.scala new file mode 100644 index 0000000..4a53e89 --- /dev/null +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/LengthTest.scala @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.scrunch + +import org.apache.crunch.io.{From => from, To => to} +import org.apache.crunch.test.CrunchTestSupport + +import org.scalatest.junit.JUnitSuite +import _root_.org.junit.Test + +/** + * Tests computing the number of elements in a PCollection from the Scala api. + */ +class LengthTest extends CrunchTestSupport with JUnitSuite { + + @Test def testLength { + val linesInShakespear: Long = 3667 + val pipeline = Pipeline.mapReduce[LengthTest](tempDir.getDefaultConfiguration) + val input = tempDir.copyResourceFileName("shakes.txt") + + val len = pipeline.read(from.textFile(input)).length() + assert(linesInShakespear == len.value()) + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala index 17a8c07..04f2a56 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala @@ -20,7 +20,7 @@ package org.apache.crunch.scrunch import scala.collection.JavaConversions import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn} -import org.apache.crunch.{PCollection => JCollection, PTable => JTable, Pair => CPair, Target} +import org.apache.crunch.{PCollection => JCollection, Pair => CPair} import org.apache.crunch.lib.{Aggregate, Cartesian} import org.apache.crunch.scrunch.Conversions._ import org.apache.crunch.scrunch.interpreter.InterpreterRunner http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala index d8d74fc..5aee5cf 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala @@ -23,9 +23,9 @@ import org.apache.crunch.types.{PType, PTableType} trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] { val native: NativeType - + def wrap(newNative: AnyRef): FullType - + def write(target: Target): FullType = wrap(native.write(target)) def parallelDo[T](fn: DoFn[S, T], ptype: PType[T]) = { @@ -43,6 +43,15 @@ trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] { def parallelDo[K, V](name: String, fn: DoFn[S, JPair[K, V]], ptype: PTableType[K, V]) = { new PTable[K, V](native.parallelDo(name, fn, ptype)) } - + + /** + * Gets the number of elements represented by this PCollection. + * + * @return The number of elements in this PCollection. + */ + def length(): PObject[java.lang.Long] = { + PObject(native.length()) + } + def getTypeFamily() = Avros } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PObject.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PObject.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PObject.scala index d52cb2c..5dcead4 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PObject.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PObject.scala @@ -18,7 +18,6 @@ package org.apache.crunch.scrunch import org.apache.crunch.{PObject => JPObject} -import org.apache.crunch.Target /** * Represents a singleton value that results from a distributed computation. @@ -34,7 +33,9 @@ class PObject[T] private (private val native: JPObject[T]) { * * @return The value associated with this PObject. */ - def value(): T = native.getValue() + def value(): T = { + native.getValue() + } } /** http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java b/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java new file mode 100644 index 0000000..60385f0 --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.lang.Long; +import java.util.Collection; + +import org.apache.crunch.PObject; +import org.apache.crunch.impl.mem.MemPipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.types.avro.AvroTypeFamily; +import org.apache.crunch.types.writable.WritableTypeFamily; +import org.junit.Rule; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +@SuppressWarnings("serial") +public class CollectionsLengthIT { + + public static final Long LINES_IN_SHAKESPEAR = 3667L; + + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + @Test + public void testWritables() throws IOException { + run(new MRPipeline(CollectionsIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance()); + } + + @Test + public void testAvro() throws IOException { + run(new MRPipeline(CollectionsIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance()); + } + + @Test + public void testInMemoryWritables() throws IOException { + run(MemPipeline.getInstance(), WritableTypeFamily.getInstance()); + } + + @Test + public void testInMemoryAvro() throws IOException { + run(MemPipeline.getInstance(), AvroTypeFamily.getInstance()); + } + + public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException { + String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt"); + + PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath); + Long length = shakespeare.length().getValue(); + assertEquals("Incorrect length for shakespear PCollection.", LINES_IN_SHAKESPEAR, length); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch/src/main/java/org/apache/crunch/PCollection.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/PCollection.java b/crunch/src/main/java/org/apache/crunch/PCollection.java index 8e73159..f5a3465 100644 --- a/crunch/src/main/java/org/apache/crunch/PCollection.java +++ b/crunch/src/main/java/org/apache/crunch/PCollection.java @@ -130,6 +130,13 @@ public interface PCollection<S> { long getSize(); /** + * Returns the number of elements represented by this {@code PCollection}. + * + * @return An {@code PObject} containing the number of elements in this {@code PCollection}. + */ + PObject<Long> length(); + + /** * Returns a shorthand name for this PCollection. */ String getName(); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java index 9e5b6f2..a79ec2b 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java @@ -166,6 +166,11 @@ public class MemCollection<S> implements PCollection<S> { } @Override + public PObject<Long> length() { + return Aggregate.length(this); + } + + @Override public PCollection<S> sample(double acceptanceProbability) { return Sample.sample(this, acceptanceProbability); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java index 486b976..d4948c0 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java @@ -163,6 +163,11 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { } @Override + public PObject<Long> length() { + return Aggregate.length(this); + } + + @Override public PObject<S> max() { return Aggregate.max(this); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java index c2c56c0..a0588e0 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java +++ b/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java @@ -20,6 +20,7 @@ package org.apache.crunch.lib; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.Iterator; import java.util.List; import java.util.PriorityQueue; @@ -59,6 +60,25 @@ public class Aggregate { }, tf.tableOf(collect.getPType(), tf.longs())).groupByKey().combineValues(CombineFn.<S> SUM_LONGS()); } + /** + * Returns the number of elements in the provided PCollection. + * + * @param collect The PCollection whose elements should be counted. + * @param <S> The type of the PCollection. + * @return A {@code PObject} containing the number of elements in the {@code PCollection}. + */ + public static <S> PObject<Long> length(PCollection<S> collect) { + PTypeFamily tf = collect.getTypeFamily(); + PTable<Integer, Long> countTable = collect.parallelDo("Aggregate.count", + new MapFn<S, Pair<Integer, Long>>() { + public Pair<Integer, Long> map(S input) { + return Pair.of(1, 1L); + } + }, tf.tableOf(tf.ints(), tf.longs())).groupByKey().combineValues(CombineFn.<Integer> SUM_LONGS()); + PCollection<Long> count = countTable.values(); + return new FirstElementPObject<Long>(count); + } + public static class PairValueComparator<K, V> implements Comparator<Pair<K, V>> { private final boolean ascending;
