Updated Branches: refs/heads/master 6536f4c90 -> 3e9d6ed64
CRUNCH-157: Make setContext and initialize separate function calls in RTNode and elsewhere Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/3e9d6ed6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/3e9d6ed6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/3e9d6ed6 Branch: refs/heads/master Commit: 3e9d6ed64315ebf0a4d6f8081ca3c9102c027a70 Parents: 6536f4c Author: Josh Wills <[email protected]> Authored: Sun Feb 3 10:18:13 2013 -0800 Committer: Josh Wills <[email protected]> Committed: Mon Feb 4 19:03:11 2013 -0800 ---------------------------------------------------------------------- crunch/src/main/java/org/apache/crunch/DoFn.java | 1 - .../src/main/java/org/apache/crunch/FilterFn.java | 22 ++++++- .../java/org/apache/crunch/fn/CompositeMapFn.java | 11 +++- .../java/org/apache/crunch/fn/ExtractKeyFn.java | 8 ++- .../main/java/org/apache/crunch/fn/PairMapFn.java | 13 +++- .../crunch/impl/mem/collect/MemCollection.java | 2 + .../java/org/apache/crunch/impl/mr/run/RTNode.java | 1 + .../java/org/apache/crunch/lib/SecondarySort.java | 1 + .../org/apache/crunch/types/PGroupedTableType.java | 10 +++- .../apache/crunch/types/avro/AvroTableType.java | 21 +++++- .../java/org/apache/crunch/types/avro/Avros.java | 49 ++++++++++++-- .../apache/crunch/types/writable/Writables.java | 50 ++++++++++++--- .../org/apache/crunch/types/avro/AvrosTest.java | 4 +- 13 files changed, 160 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/main/java/org/apache/crunch/DoFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/DoFn.java b/crunch/src/main/java/org/apache/crunch/DoFn.java index 8d7cc17..2c6389a 100644 --- a/crunch/src/main/java/org/apache/crunch/DoFn.java +++ b/crunch/src/main/java/org/apache/crunch/DoFn.java @@ -103,7 +103,6 @@ public abstract class DoFn<S, T> implements Serializable { */ public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { this.context = context; - initialize(); } /** http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/main/java/org/apache/crunch/FilterFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/FilterFn.java b/crunch/src/main/java/org/apache/crunch/FilterFn.java index d635b66..440f122 100644 --- a/crunch/src/main/java/org/apache/crunch/FilterFn.java +++ b/crunch/src/main/java/org/apache/crunch/FilterFn.java @@ -91,10 +91,16 @@ public abstract class FilterFn<T> extends DoFn<T, T> { for (FilterFn<S> fn : fns) { fn.setContext(context); } - initialize(); } @Override + public void initialize() { + for (FilterFn<S> fn : fns) { + fn.initialize(); + } + } + + @Override public void cleanup() { for (FilterFn<S> fn : fns) { fn.cleanup(); @@ -151,7 +157,13 @@ public abstract class FilterFn<T> extends DoFn<T, T> { for (FilterFn<S> fn : fns) { fn.setContext(context); } - initialize(); + } + + @Override + public void initialize() { + for (FilterFn<S> fn : fns) { + fn.initialize(); + } } @Override @@ -207,7 +219,11 @@ public abstract class FilterFn<T> extends DoFn<T, T> { @Override public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { base.setContext(context); - initialize(); + } + + @Override + public void initialize() { + base.initialize(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java b/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java index 4714fe4..2a8e7d9 100644 --- a/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java +++ b/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java @@ -20,6 +20,7 @@ package org.apache.crunch.fn; import org.apache.crunch.Emitter; import org.apache.crunch.MapFn; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; public class CompositeMapFn<R, S, T> extends MapFn<R, T> { @@ -32,9 +33,15 @@ public class CompositeMapFn<R, S, T> extends MapFn<R, T> { } @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + first.setContext(context); + second.setContext(context); + } + + @Override public void initialize() { - first.setContext(getContext()); - second.setContext(getContext()); + first.initialize(); + second.initialize(); } public MapFn<R, S> getFirst() { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java b/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java index 99ce277..b8cc9df 100644 --- a/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java +++ b/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java @@ -19,6 +19,7 @@ package org.apache.crunch.fn; import org.apache.crunch.MapFn; import org.apache.crunch.Pair; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; /** * Wrapper function for converting a {@code MapFn} into a key-value pair that is @@ -33,8 +34,13 @@ public class ExtractKeyFn<K, V> extends MapFn<V, Pair<K, V>> { } @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + mapFn.setContext(context); + } + + @Override public void initialize() { - this.mapFn.setContext(getContext()); + mapFn.initialize(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java b/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java index b25a6d8..9ee4336 100644 --- a/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java +++ b/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java @@ -21,6 +21,7 @@ import org.apache.crunch.Emitter; import org.apache.crunch.MapFn; import org.apache.crunch.Pair; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; public class PairMapFn<K, V, S, T> extends MapFn<Pair<K, V>, Pair<S, T>> { @@ -39,12 +40,18 @@ public class PairMapFn<K, V, S, T> extends MapFn<Pair<K, V>, Pair<S, T>> { } @Override - public void initialize() { - keys.setContext(getContext()); - values.setContext(getContext()); + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + keys.setContext(context); + values.setContext(context); } @Override + public void initialize() { + keys.initialize(); + values.initialize(); + } + + @Override public Pair<S, T> map(Pair<K, V> input) { return Pair.of(keys.map(input.first()), values.map(input.second())); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java index defad1b..cc9f3fc 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java @@ -104,6 +104,7 @@ public class MemCollection<S> implements PCollection<S> { ParallelDoOptions options) { InMemoryEmitter<T> emitter = new InMemoryEmitter<T>(); doFn.setContext(getInMemoryContext(getPipeline().getConfiguration())); + doFn.initialize(); for (S s : collect) { doFn.process(s, emitter); } @@ -126,6 +127,7 @@ public class MemCollection<S> implements PCollection<S> { ParallelDoOptions options) { InMemoryEmitter<Pair<K, V>> emitter = new InMemoryEmitter<Pair<K, V>>(); doFn.setContext(getInMemoryContext(getPipeline().getConfiguration())); + doFn.initialize(); for (S s : collect) { doFn.process(s, emitter); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java index e30980d..ce7b795 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java @@ -64,6 +64,7 @@ public class RTNode implements Serializable { } fn.setContext(ctxt.getContext()); + fn.initialize(); for (RTNode child : children) { child.initialize(ctxt); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java b/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java index 30639b1..54b4396 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java +++ b/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java @@ -102,6 +102,7 @@ public class SecondarySort { @Override public void initialize() { intern.setContext(getContext()); + intern.initialize(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java b/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java index c222c89..e9c773c 100644 --- a/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java +++ b/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java @@ -28,6 +28,7 @@ import org.apache.crunch.SourceTarget; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; import com.google.common.collect.Iterables; @@ -87,10 +88,15 @@ public abstract class PGroupedTableType<K, V> implements PType<Pair<K, Iterable< values.configure(conf); } + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + keys.setContext(context); + values.setContext(context); + } + @Override public void initialize() { - keys.setContext(getContext()); - values.setContext(getContext()); + keys.initialize(); + values.initialize(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java index 31dbd74..86613df 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java @@ -27,6 +27,7 @@ import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.crunch.types.TupleDeepCopier; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; /** * The implementation of the PTableType interface for Avro-based serialization. @@ -57,9 +58,15 @@ class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements PTableType<K, } @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + keyMapFn.setContext(context); + valueMapFn.setContext(context); + } + + @Override public void initialize() { - keyMapFn.setContext(getContext()); - valueMapFn.setContext(getContext()); + keyMapFn.initialize(); + valueMapFn.initialize(); pairSchemaJson = org.apache.avro.mapred.Pair.getPairSchema( new Schema.Parser().parse(firstJson), new Schema.Parser().parse(secondJson)).toString(); } @@ -93,9 +100,15 @@ class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements PTableType<K, } @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + firstMapFn.setContext(context); + secondMapFn.setContext(context); + } + + @Override public void initialize() { - firstMapFn.setContext(getContext()); - secondMapFn.setContext(getContext()); + firstMapFn.initialize(); + secondMapFn.initialize(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java index c8a2ef5..fc30eaf 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java @@ -56,6 +56,7 @@ import org.apache.crunch.types.TupleFactory; import org.apache.crunch.types.writable.WritableDeepCopier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.apache.hadoop.util.ReflectionUtils; import com.google.common.collect.ImmutableList; @@ -292,8 +293,13 @@ public class Avros { } @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + mapFn.setContext(context); + } + + @Override public void initialize() { - this.mapFn.setContext(getContext()); + mapFn.initialize(); } @Override @@ -331,8 +337,13 @@ public class Avros { } @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + mapFn.setContext(context); + } + + @Override public void initialize() { - this.mapFn.setContext(getContext()); + mapFn.initialize(); } @Override @@ -369,8 +380,13 @@ public class Avros { } @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + mapFn.setContext(context); + } + + @Override public void initialize() { - this.mapFn.setContext(getContext()); + mapFn.initialize(); } @Override @@ -396,8 +412,13 @@ public class Avros { } @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + mapFn.setContext(context); + } + + @Override public void initialize() { - this.mapFn.setContext(getContext()); + this.mapFn.initialize(); } @Override @@ -441,9 +462,16 @@ public class Avros { } @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + for (MapFn fn : fns) { + fn.setContext(context); + } + } + + @Override public void initialize() { for (MapFn fn : fns) { - fn.setContext(getContext()); + fn.initialize(); } this.values = new Object[fns.size()]; tupleFactory.initialize(); @@ -499,12 +527,19 @@ public class Avros { fn.configure(conf); } } - + + @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + for (MapFn fn : fns) { + fn.setContext(getContext()); + } + } + @Override public void initialize() { this.schema = new Schema.Parser().parse(jsonSchema); for (MapFn fn : fns) { - fn.setContext(getContext()); + fn.initialize(); } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java index 67f0621..25ae370 100644 --- a/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java +++ b/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java @@ -48,6 +48,7 @@ import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -297,9 +298,16 @@ public class Writables { } @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + for (MapFn fn : fns) { + fn.setContext(context); + } + } + + @Override public void initialize() { for (MapFn fn : fns) { - fn.setContext(getContext()); + fn.initialize(); } // The rest of the methods allocate new // objects each time. However this one @@ -346,13 +354,19 @@ public class Writables { } } - + @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + for (MapFn fn : fns) { + fn.setContext(context); + } + } + @Override public void initialize() { this.values = new Writable[fns.size()]; this.writable = new TupleWritable(values); for (MapFn fn : fns) { - fn.setContext(getContext()); + fn.initialize(); } } @@ -426,10 +440,14 @@ public class Writables { mapFn.configure(conf); } - + @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + mapFn.setContext(context); + } + @Override public void initialize() { - mapFn.setContext(getContext()); + mapFn.initialize(); } @Override @@ -457,10 +475,14 @@ public class Writables { mapFn.configure(conf); } - + @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + mapFn.setContext(context); + } + @Override public void initialize() { - mapFn.setContext(getContext()); + mapFn.initialize(); } @Override @@ -495,8 +517,13 @@ public class Writables { } @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + mapFn.setContext(context); + } + + @Override public void initialize() { - mapFn.setContext(getContext()); + mapFn.initialize(); } @Override @@ -525,8 +552,13 @@ public class Writables { } @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + mapFn.setContext(context); + } + + @Override public void initialize() { - mapFn.setContext(getContext()); + mapFn.initialize(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java index dabf0fe..5622a56 100644 --- a/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java @@ -192,8 +192,10 @@ public class AvrosTest { TaskInputOutputContext<?, ?, ?, ?> testContext = CrunchTestSupport.getTestContext(new Configuration()); at.getInputMapFn().setContext(testContext); + at.getInputMapFn().initialize(); at.getOutputMapFn().setContext(testContext); - + at.getOutputMapFn().initialize(); + LongWritable lw = new LongWritable(1729L); assertEquals(lw, at.getInputMapFn().map(at.getOutputMapFn().map(lw))); }
