Use AutoValue for ExecutorUpdate Explicitly provide the collections the Bundle should be consumed by in the update.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/59cca8dd Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/59cca8dd Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/59cca8dd Branch: refs/heads/master Commit: 59cca8ddae3d544beea9684719409efe3acbe634 Parents: e7df160 Author: Thomas Groh <tg...@google.com> Authored: Fri May 6 10:33:32 2016 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Fri May 6 10:55:53 2016 -0700 ---------------------------------------------------------------------- .../direct/ExecutorServiceParallelExecutor.java | 59 ++++++++++---------- 1 file changed, 30 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59cca8dd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index 6f26b6b..fd4cc2c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PValue; +import com.google.auto.value.AutoValue; import com.google.common.base.MoreObjects; import com.google.common.base.Optional; import com.google.common.cache.CacheBuilder; @@ -191,8 +192,9 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { return keyedPValues.contains(pvalue); } - private void scheduleConsumers(CommittedBundle<?> bundle) { - for (AppliedPTransform<?, ?, ?> consumer : valueToConsumers.get(bundle.getPCollection())) { + private void scheduleConsumers(ExecutorUpdate update) { + CommittedBundle<?> bundle = update.getBundle().get(); + for (AppliedPTransform<?, ?, ?> consumer : update.getConsumers()) { scheduleConsumption(consumer, bundle, defaultCompletionCallback); } } @@ -225,7 +227,8 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { CommittedBundle<?> inputBundle, InProcessTransformResult result) { CommittedResult committedResult = getCommittedResult(inputBundle, result); for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) { - allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle)); + allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle, + valueToConsumers.get(outputBundle.getPCollection()))); } return committedResult; } @@ -276,38 +279,36 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { * * Used to signal when the executor should be shut down (due to an exception). */ - private static class ExecutorUpdate { - private final Optional<? extends CommittedBundle<?>> bundle; - private final Optional<? extends Throwable> throwable; - - public static ExecutorUpdate fromBundle(CommittedBundle<?> bundle) { - return new ExecutorUpdate(bundle, null); + @AutoValue + abstract static class ExecutorUpdate { + public static ExecutorUpdate fromBundle( + CommittedBundle<?> bundle, + Collection<AppliedPTransform<?, ?, ?>> consumers) { + return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate( + Optional.of(bundle), + consumers, + Optional.<Throwable>absent()); } public static ExecutorUpdate fromThrowable(Throwable t) { - return new ExecutorUpdate(null, t); - } - - private ExecutorUpdate(CommittedBundle<?> producedBundle, Throwable throwable) { - this.bundle = Optional.fromNullable(producedBundle); - this.throwable = Optional.fromNullable(throwable); + return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate( + Optional.<CommittedBundle<?>>absent(), + Collections.<AppliedPTransform<?, ?, ?>>emptyList(), + Optional.of(t)); } - public Optional<? extends CommittedBundle<?>> getBundle() { - return bundle; - } + /** + * Returns the bundle that produced this update. + */ + public abstract Optional<? extends CommittedBundle<?>> getBundle(); - public Optional<? extends Throwable> getException() { - return throwable; - } + /** + * Returns the transforms to process the bundle. If nonempty, {@link #getBundle()} will return + * a present {@link Optional}. + */ + public abstract Collection<AppliedPTransform<?, ?, ?>> getConsumers(); - @Override - public String toString() { - return MoreObjects.toStringHelper(ExecutorUpdate.class) - .add("bundle", bundle) - .add("exception", throwable) - .toString(); - } + public abstract Optional<? extends Throwable> getException(); } /** @@ -353,7 +354,7 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { while (update != null) { LOG.debug("Executor Update: {}", update); if (update.getBundle().isPresent()) { - scheduleConsumers(update.getBundle().get()); + scheduleConsumers(update); } else if (update.getException().isPresent()) { visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get())); }