Updated Branches: refs/heads/master 22a33dd56 -> 883c565a3
CRUNCH-71 - Initialize PType MapFns for deep copy Add a initialization of the input and output MapFns on PTypes so that they can be reliably used for deep copying of values within DoFns. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/883c565a Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/883c565a Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/883c565a Branch: refs/heads/master Commit: 883c565a31c914dd004c649fa46df1e47fb85bba Parents: 22a33dd Author: Gabriel Reid <[email protected]> Authored: Thu Sep 20 14:26:40 2012 +0200 Committer: Gabriel Reid <[email protected]> Committed: Thu Sep 20 20:21:26 2012 +0200 ---------------------------------------------------------------------- .../main/java/org/apache/crunch/lib/Aggregate.java | 7 +++++- .../apache/crunch/lib/join/FullOuterJoinFn.java | 1 + .../org/apache/crunch/lib/join/InnerJoinFn.java | 1 + .../java/org/apache/crunch/lib/join/JoinFn.java | 6 +++++ .../apache/crunch/lib/join/LeftOuterJoinFn.java | 1 + .../apache/crunch/lib/join/RightOuterJoinFn.java | 1 + .../main/java/org/apache/crunch/types/PType.java | 8 ++++++ .../crunch/types/avro/AvroGroupedTableType.java | 5 ++++ .../org/apache/crunch/types/avro/AvroType.java | 5 ++++ .../types/writable/WritableGroupedTableType.java | 5 ++++ .../crunch/types/writable/WritableTableType.java | 6 +++++ .../apache/crunch/types/writable/WritableType.java | 14 +++++++++++ .../writable/WritableGroupedTableTypeTest.java | 1 + .../types/writable/WritableTableTypeTest.java | 2 +- .../crunch/types/writable/WritableTypeTest.java | 18 +++++++++++--- 15 files changed, 75 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java index a0588e0..dc3de7c 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java +++ b/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java @@ -20,7 +20,6 @@ package org.apache.crunch.lib; import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.Iterator; import java.util.List; import java.util.PriorityQueue; @@ -245,6 +244,12 @@ public class Aggregate { PTypeFamily tf = collect.getTypeFamily(); final PType<V> valueType = collect.getValueType(); return collect.groupByKey().parallelDo("collect", new MapValuesFn<K, Iterable<V>, Collection<V>>() { + + @Override + public void initialize() { + valueType.initialize(); + } + public Collection<V> map(Iterable<V> values) { List<V> collected = Lists.newArrayList(); for (V value : values) { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java index 3c63f07..0ceb382 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java +++ b/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java @@ -48,6 +48,7 @@ public class FullOuterJoinFn<K, U, V> extends JoinFn<K, U, V> { /** {@inheritDoc} */ @Override public void initialize() { + super.initialize(); lastId = 1; lastKey = null; this.leftValues = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java index bbcc35f..5275c95 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java +++ b/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java @@ -47,6 +47,7 @@ public class InnerJoinFn<K, U, V> extends JoinFn<K, U, V> { /** {@inheritDoc} */ @Override public void initialize() { + super.initialize(); lastKey = null; this.leftValues = Lists.newArrayList(); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java index f45ce9c..dab6c34 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java +++ b/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java @@ -51,6 +51,12 @@ public abstract class JoinFn<K, U, V> extends DoFn<Pair<Pair<K, Integer>, Iterab this.leftValueType = leftValueType; } + @Override + public void initialize() { + this.keyType.initialize(); + this.leftValueType.initialize(); + } + /** @return The name of this join type (e.g. innerJoin, leftOuterJoin). */ public abstract String getJoinType(); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java index 272e081..116353a 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java +++ b/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java @@ -48,6 +48,7 @@ public class LeftOuterJoinFn<K, U, V> extends JoinFn<K, U, V> { /** {@inheritDoc} */ @Override public void initialize() { + super.initialize(); lastId = 1; lastKey = null; this.leftValues = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java index 2dbb2f9..51b74cc 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java +++ b/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java @@ -47,6 +47,7 @@ public class RightOuterJoinFn<K, U, V> extends JoinFn<K, U, V> { /** {@inheritDoc} */ @Override public void initialize() { + super.initialize(); lastKey = null; this.leftValues = Lists.newArrayList(); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/main/java/org/apache/crunch/types/PType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/PType.java b/crunch/src/main/java/org/apache/crunch/types/PType.java index a60ce62..bbe8a4b 100644 --- a/crunch/src/main/java/org/apache/crunch/types/PType.java +++ b/crunch/src/main/java/org/apache/crunch/types/PType.java @@ -52,6 +52,14 @@ public interface PType<T> extends Serializable { Converter getConverter(); /** + * Initialize this PType for use within a DoFn. This generally only needs to + * be called when using a PType for {@link #getDetachedValue(Object)}. + * + * @see PType#getDetachedValue(Object) + */ + void initialize(); + + /** * Returns a copy of a value (or the value itself) that can safely be * retained. * <p> http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java index ab68e71..aa5b5dc 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java @@ -72,6 +72,11 @@ public class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V> { } @Override + public void initialize() { + // No initialization needed for Avro PTypes + } + + @Override public Pair<K, Iterable<V>> getDetachedValue(Pair<K, Iterable<V>> value) { return PTables.getGroupedDetachedValue(this, value); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java index 4997157..a127baa 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java @@ -163,6 +163,11 @@ public class AvroType<T> implements PType<T> { return new AvroFileSourceTarget<T>(path, this); } + @Override + public void initialize() { + // No initialization needed for Avro PTypes + } + public T getDetachedValue(T value) { return deepCopier.deepCopy(value); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java index 3c9312a..98afb4d 100644 --- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java +++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java @@ -61,6 +61,11 @@ public class WritableGroupedTableType<K, V> extends PGroupedTableType<K, V> { } @Override + public void initialize() { + this.tableType.initialize(); + } + + @Override public Pair<K, Iterable<V>> getDetachedValue(Pair<K, Iterable<V>> value) { return PTables.getGroupedDetachedValue(this, value); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java index fc6dd04..6bb6c5d 100644 --- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java +++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java @@ -101,6 +101,12 @@ class WritableTableType<K, V> implements PTableType<K, V> { } @Override + public void initialize() { + keyType.initialize(); + valueType.initialize(); + } + + @Override public Pair<K, V> getDetachedValue(Pair<K, V> value) { return PTables.getDetachedValue(this, value); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java index 23c95ea..71f81f4 100644 --- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java +++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java @@ -41,6 +41,7 @@ public class WritableType<T, W extends Writable> implements PType<T> { private final MapFn<T, W> outputFn; private final DeepCopier<W> deepCopier; private final List<PType> subTypes; + private boolean initialized = false; WritableType(Class<T> typeClass, Class<W> writableClass, MapFn<W, T> inputDoFn, MapFn<T, W> outputDoFn, PType... subTypes) { @@ -102,7 +103,20 @@ public class WritableType<T, W extends Writable> implements PType<T> { } @Override + public void initialize() { + this.inputFn.initialize(); + this.outputFn.initialize(); + for (PType subType : subTypes) { + subType.initialize(); + } + this.initialized = true; + } + + @Override public T getDetachedValue(T value) { + if (!initialized) { + throw new IllegalStateException("Cannot call getDetachedValue on an uninitialized PType"); + } W writableValue = outputFn.map(value); W deepCopy = this.deepCopier.deepCopy(writableValue); return inputFn.map(deepCopy); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/test/java/org/apache/crunch/types/writable/WritableGroupedTableTypeTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/writable/WritableGroupedTableTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/writable/WritableGroupedTableTypeTest.java index 3596f13..1699f3c 100644 --- a/crunch/src/test/java/org/apache/crunch/types/writable/WritableGroupedTableTypeTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/writable/WritableGroupedTableTypeTest.java @@ -41,6 +41,7 @@ public class WritableGroupedTableTypeTest { PGroupedTableType<Integer, Text> groupedTableType = Writables.tableOf(Writables.ints(), Writables.writables(Text.class)).getGroupedTableType(); + groupedTableType.initialize(); Pair<Integer, Iterable<Text>> detachedPair = groupedTableType.getDetachedValue(pair); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTableTypeTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTableTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/writable/WritableTableTypeTest.java index f27e7b7..ae68e7a 100644 --- a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTableTypeTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/writable/WritableTableTypeTest.java @@ -34,7 +34,7 @@ public class WritableTableTypeTest { Pair<Integer, Text> pair = Pair.of(integerValue, textValue); WritableTableType<Integer, Text> tableType = Writables.tableOf(Writables.ints(), Writables.writables(Text.class)); - + tableType.initialize(); Pair<Integer, Text> detachedPair = tableType.getDetachedValue(pair); assertSame(integerValue, detachedPair.first()); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java index 51a87f5..bea953d 100644 --- a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java @@ -19,8 +19,6 @@ package org.apache.crunch.types.writable; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.fail; import java.util.Collection; import java.util.Map; @@ -35,9 +33,20 @@ import com.google.common.collect.Maps; public class WritableTypeTest { + @Test(expected = IllegalStateException.class) + public void testGetDetachedValue_NotInitialized() { + WritableType<Text, Text> textWritableType = Writables.writables(Text.class); + Text value = new Text("test"); + + // Calling getDetachedValue without first calling initialize should throw an + // exception + textWritableType.getDetachedValue(value); + } + @Test public void testGetDetachedValue_CustomWritable() { WritableType<Text, Text> textWritableType = Writables.writables(Text.class); + textWritableType.initialize(); Text value = new Text("test"); Text detachedValue = textWritableType.getDetachedValue(value); @@ -50,6 +59,7 @@ public class WritableTypeTest { Collection<Text> textCollection = Lists.newArrayList(new Text("value")); WritableType<Collection<Text>, GenericArrayWritable<Text>> ptype = Writables.collections(Writables .writables(Text.class)); + ptype.initialize(); Collection<Text> detachedCollection = ptype.getDetachedValue(textCollection); assertEquals(textCollection, detachedCollection); @@ -61,8 +71,7 @@ public class WritableTypeTest { Pair<Text, Text> textPair = Pair.of(new Text("one"), new Text("two")); WritableType<Pair<Text, Text>, TupleWritable> ptype = Writables.pairs(Writables.writables(Text.class), Writables.writables(Text.class)); - ptype.getOutputMapFn().initialize(); - ptype.getInputMapFn().initialize(); + ptype.initialize(); Pair<Text, Text> detachedPair = ptype.getDetachedValue(textPair); assertEquals(textPair, detachedPair); @@ -76,6 +85,7 @@ public class WritableTypeTest { stringTextMap.put("key", new Text("value")); WritableType<Map<String, Text>, MapWritable> ptype = Writables.maps(Writables.writables(Text.class)); + ptype.initialize(); Map<String, Text> detachedMap = ptype.getDetachedValue(stringTextMap); assertEquals(stringTextMap, detachedMap);
