Updated Branches: refs/heads/master b681953c5 -> 0bec4e4e6
CRUNCH-158: Fix DoFn intialization in MemCollection and add support for Counters to the in-memory impl Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/0bec4e4e Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/0bec4e4e Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/0bec4e4e Branch: refs/heads/master Commit: 0bec4e4e6f8bbd7a36c25129ebf2d5686e12616f Parents: b681953 Author: Josh Wills <[email protected]> Authored: Wed Jan 30 07:12:10 2013 -0800 Committer: Josh Wills <[email protected]> Committed: Wed Jan 30 07:12:10 2013 -0800 ---------------------------------------------------------------------- .../org/apache/crunch/impl/mem/MemPipeline.java | 8 ++++- .../crunch/impl/mem/collect/MemCollection.java | 22 +++++++++++--- 2 files changed, 23 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0bec4e4e/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java index 3e28a0c..95c9e72 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java @@ -28,7 +28,6 @@ import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; import org.apache.crunch.PipelineResult; import org.apache.crunch.Source; -import org.apache.crunch.SourceTarget; import org.apache.crunch.TableSource; import org.apache.crunch.Target; import org.apache.crunch.impl.mem.collect.MemCollection; @@ -42,6 +41,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Counters; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -49,9 +49,13 @@ import com.google.common.collect.Lists; public class MemPipeline implements Pipeline { private static final Log LOG = LogFactory.getLog(MemPipeline.class); - + private static Counters COUNTERS = new Counters(); private static final MemPipeline INSTANCE = new MemPipeline(); + public static Counters getCounters() { + return COUNTERS; + } + public static Pipeline getInstance() { return INSTANCE; } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0bec4e4e/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java index ffc38ae..defad1b 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java @@ -103,7 +103,7 @@ public class MemCollection<S> implements PCollection<S> { public <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type, ParallelDoOptions options) { InMemoryEmitter<T> emitter = new InMemoryEmitter<T>(); - doFn.initialize(); + doFn.setContext(getInMemoryContext(getPipeline().getConfiguration())); for (S s : collect) { doFn.process(s, emitter); } @@ -126,7 +126,6 @@ public class MemCollection<S> implements PCollection<S> { ParallelDoOptions options) { InMemoryEmitter<Pair<K, V>> emitter = new InMemoryEmitter<Pair<K, V>>(); doFn.setContext(getInMemoryContext(getPipeline().getConfiguration())); - doFn.initialize(); for (S s : collect) { doFn.process(s, emitter); } @@ -250,13 +249,26 @@ public class MemCollection<S> implements PCollection<S> { factory.setFilter(new MethodFilter() { @Override public boolean isHandled(Method m) { - return m.getName().equals("getConfiguration"); + String name = m.getName(); + return "getConfiguration".equals(name) || "getCounter".equals(name) || "progress".equals(name); } }); MethodHandler handler = new MethodHandler() { @Override - public Object invoke(Object arg0, Method arg1, Method arg2, Object[] arg3) throws Throwable { - return conf; + public Object invoke(Object arg0, Method m, Method arg2, Object[] args) throws Throwable { + String name = m.getName(); + if ("getConfiguration".equals(name)) { + return conf; + } else if ("progress".equals(name)) { + // no-op + return null; + } else { // getCounter + if (args.length == 1) { + return MemPipeline.getCounters().findCounter((Enum<?>) args[0]); + } else { + return MemPipeline.getCounters().findCounter((String) args[0], (String) args[1]); + } + } } }; try {
