jstorm-runner: support Flatten with empty inputs.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aca16cc9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aca16cc9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aca16cc9 Branch: refs/heads/jstorm-runner Commit: aca16cc9b2224b9bfce98719c6ef2abbad94f7df Parents: 4d634ec Author: Pei He <[email protected]> Authored: Wed Jul 19 15:34:56 2017 +0800 Committer: Pei He <[email protected]> Committed: Sat Aug 19 12:02:59 2017 +0800 ---------------------------------------------------------------------- .../jstorm/translation/FlattenTranslator.java | 104 ++++++++++++++++++- 1 file changed, 100 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/aca16cc9/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java index 89708df..8f239bf 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java @@ -18,11 +18,24 @@ package org.apache.beam.runners.jstorm.translation; import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TaggedPValue; import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; /** * Translates a {@link Flatten} to a JStorm {@link FlattenExecutor}. @@ -40,10 +53,93 @@ class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollecti PCollection<V> pc = (PCollection<V>) entry.getValue(); inputs.putAll(pc.expand()); } - System.out.println("Real inputs: " + inputs); - System.out.println("FlattenList inputs: " + userGraphContext.getInputs()); String description = describeTransform(transform, inputs, userGraphContext.getOutputs()); - FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag()); - context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs()); + + if (inputs.isEmpty()) { + // Create a empty source + TupleTag<?> tag = userGraphContext.getOutputTag(); + PValue output = userGraphContext.getOutput(); + + UnboundedSourceSpout spout = new UnboundedSourceSpout( + description, + new EmptySource(), + userGraphContext.getOptions(), + tag); + context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output)); + + } else { + FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag()); + context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs()); + } + } + + private static class EmptySource extends UnboundedSource<Void, UnboundedSource.CheckpointMark> { + @Override + public List<? extends UnboundedSource<Void, CheckpointMark>> split( + int i, PipelineOptions pipelineOptions) throws Exception { + return Collections.singletonList(this); + } + + @Override + public UnboundedReader<Void> createReader( + PipelineOptions pipelineOptions, + @Nullable CheckpointMark checkpointMark) throws IOException { + return new EmptyReader(); + } + + @Override + public Coder<CheckpointMark> getCheckpointMarkCoder() { + return null; + } + + @Override + public void validate() { + } + + @Override + public Coder<Void> getDefaultOutputCoder() { + return VoidCoder.of(); + } + + private class EmptyReader extends UnboundedReader<Void> { + @Override + public boolean start() throws IOException { + return false; + } + + @Override + public boolean advance() throws IOException { + return false; + } + + @Override + public Void getCurrent() throws NoSuchElementException { + throw new NoSuchElementException(); + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + throw new NoSuchElementException(); + } + + @Override + public void close() throws IOException { + } + + @Override + public Instant getWatermark() { + return BoundedWindow.TIMESTAMP_MAX_VALUE; + } + + @Override + public CheckpointMark getCheckpointMark() { + return null; + } + + @Override + public UnboundedSource<Void, ?> getCurrentSource() { + return EmptySource.this; + } + } } }
