http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java index c487578..77e4381 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java @@ -18,94 +18,101 @@ package org.apache.beam.runners.jstorm.translation.translator; import avro.shaded.com.google.common.collect.Maps; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import org.apache.beam.runners.jstorm.translation.TranslationContext; import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor; import org.apache.beam.runners.jstorm.translation.runtime.MultiOutputDoFnExecutor; import org.apache.beam.runners.jstorm.translation.runtime.MultiStatefulDoFnExecutor; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.*; - -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.PValueBase; +import org.apache.beam.sdk.values.TupleTag; /** * Translates a ParDo.BoundMulti to a Storm {@link DoFnExecutor}. */ public class ParDoBoundMultiTranslator<InputT, OutputT> - extends TransformTranslator.Default<ParDo.MultiOutput<InputT, OutputT>> { - - @Override - public void translateNode(ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) { - final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - final TupleTag<InputT> inputTag = (TupleTag<InputT>) userGraphContext.getInputTag(); - PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput(); + extends TransformTranslator.Default<ParDo.MultiOutput<InputT, OutputT>> { - Map<TupleTag<?>, PValue> allOutputs = Maps.newHashMap(userGraphContext.getOutputs()); - Map<TupleTag<?>, TupleTag<?>> localToExternalTupleTagMap = Maps.newHashMap(); - for (Map.Entry<TupleTag<?>, PValue> entry : allOutputs.entrySet()) { - Iterator<TupleTag<?>> itr = ((PValueBase) entry.getValue()).expand().keySet().iterator(); - localToExternalTupleTagMap.put(entry.getKey(), itr.next()); - } + @Override + public void translateNode( + ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) { + final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + final TupleTag<InputT> inputTag = (TupleTag<InputT>) userGraphContext.getInputTag(); + PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput(); - TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag(); - List<TupleTag<?>> sideOutputTags = userGraphContext.getOutputTags(); - sideOutputTags.remove(mainOutputTag); + Map<TupleTag<?>, PValue> allOutputs = Maps.newHashMap(userGraphContext.getOutputs()); + Map<TupleTag<?>, TupleTag<?>> localToExternalTupleTagMap = Maps.newHashMap(); + for (Map.Entry<TupleTag<?>, PValue> entry : allOutputs.entrySet()) { + Iterator<TupleTag<?>> itr = ((PValueBase) entry.getValue()).expand().keySet().iterator(); + localToExternalTupleTagMap.put(entry.getKey(), itr.next()); + } - Map<TupleTag<?>, PValue> allInputs = Maps.newHashMap(userGraphContext.getInputs()); - for (PCollectionView pCollectionView : transform.getSideInputs()) { - allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); - } - String description = describeTransform( - transform, - allInputs, - allOutputs); + TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag(); + List<TupleTag<?>> sideOutputTags = userGraphContext.getOutputTags(); + sideOutputTags.remove(mainOutputTag); - ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder(); - for (PCollectionView pCollectionView : transform.getSideInputs()) { - sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); - } + Map<TupleTag<?>, PValue> allInputs = Maps.newHashMap(userGraphContext.getInputs()); + for (PCollectionView pCollectionView : transform.getSideInputs()) { + allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); + } + String description = describeTransform( + transform, + allInputs, + allOutputs); - DoFnExecutor executor; - DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); - if (signature.stateDeclarations().size() > 0 - || signature.timerDeclarations().size() > 0) { - executor = new MultiStatefulDoFnExecutor<>( - userGraphContext.getStepName(), - description, - userGraphContext.getOptions(), - (DoFn<KV, OutputT>) transform.getFn(), - (Coder) WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), - input.getWindowingStrategy(), - (TupleTag<KV>) inputTag, - transform.getSideInputs(), - sideInputTagToView.build(), - mainOutputTag, - sideOutputTags, - localToExternalTupleTagMap); - } else { - executor = new MultiOutputDoFnExecutor<>( - userGraphContext.getStepName(), - description, - userGraphContext.getOptions(), - transform.getFn(), - WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), - input.getWindowingStrategy(), - inputTag, - transform.getSideInputs(), - sideInputTagToView.build(), - mainOutputTag, - sideOutputTags, - localToExternalTupleTagMap); - } + ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder(); + for (PCollectionView pCollectionView : transform.getSideInputs()) { + sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); + } - context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs())); + DoFnExecutor executor; + DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); + if (signature.stateDeclarations().size() > 0 + || signature.timerDeclarations().size() > 0) { + executor = new MultiStatefulDoFnExecutor<>( + userGraphContext.getStepName(), + description, + userGraphContext.getOptions(), + (DoFn<KV, OutputT>) transform.getFn(), + (Coder) WindowedValue.getFullCoder( + input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), + input.getWindowingStrategy(), + (TupleTag<KV>) inputTag, + transform.getSideInputs(), + sideInputTagToView.build(), + mainOutputTag, + sideOutputTags, + localToExternalTupleTagMap); + } else { + executor = new MultiOutputDoFnExecutor<>( + userGraphContext.getStepName(), + description, + userGraphContext.getOptions(), + transform.getFn(), + WindowedValue.getFullCoder( + input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), + input.getWindowingStrategy(), + inputTag, + transform.getSideInputs(), + sideInputTagToView.build(), + mainOutputTag, + sideOutputTags, + localToExternalTupleTagMap); } + + context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs())); + } }
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java index 3a952a9..7b998d9 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java @@ -17,24 +17,25 @@ */ package org.apache.beam.runners.jstorm.translation.translator; -import java.util.List; -import java.util.Map; - import avro.shaded.com.google.common.collect.Lists; -import org.apache.beam.runners.jstorm.translation.runtime.StatefulDoFnExecutor; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.jstorm.translation.TranslationContext; import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor; +import org.apache.beam.runners.jstorm.translation.runtime.StatefulDoFnExecutor; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.*; - -import org.apache.beam.runners.jstorm.translation.TranslationContext; - +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,65 +43,68 @@ import org.slf4j.LoggerFactory; * Translates a ParDo.Bound to a Storm {@link DoFnExecutor}. */ public class ParDoBoundTranslator<InputT, OutputT> - extends TransformTranslator.Default<ParDo.SingleOutput<InputT, OutputT>> { - - private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslator.class); + extends TransformTranslator.Default<ParDo.SingleOutput<InputT, OutputT>> { - @Override - public void translateNode(ParDo.SingleOutput<InputT, OutputT> transform, TranslationContext context) { - final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - final TupleTag<?> inputTag = userGraphContext.getInputTag(); - PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput(); + private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslator.class); - TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag(); - List<TupleTag<?>> sideOutputTags = Lists.newArrayList(); + @Override + public void translateNode( + ParDo.SingleOutput<InputT, OutputT> transform, TranslationContext context) { + final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + final TupleTag<?> inputTag = userGraphContext.getInputTag(); + PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput(); - Map<TupleTag<?>, PValue> allInputs = avro.shaded.com.google.common.collect.Maps.newHashMap(userGraphContext.getInputs()); - for (PCollectionView pCollectionView : transform.getSideInputs()) { - allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); - } - String description = describeTransform( - transform, - allInputs, - userGraphContext.getOutputs()); + TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag(); + List<TupleTag<?>> sideOutputTags = Lists.newArrayList(); - ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder(); - for (PCollectionView pCollectionView : transform.getSideInputs()) { - sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); - } + Map<TupleTag<?>, PValue> allInputs = + avro.shaded.com.google.common.collect.Maps.newHashMap(userGraphContext.getInputs()); + for (PCollectionView pCollectionView : transform.getSideInputs()) { + allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); + } + String description = describeTransform( + transform, + allInputs, + userGraphContext.getOutputs()); - DoFnExecutor executor; - DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); - if (signature.stateDeclarations().size() > 0 - || signature.timerDeclarations().size() > 0) { - executor = new StatefulDoFnExecutor<>( - userGraphContext.getStepName(), - description, - userGraphContext.getOptions(), - (DoFn<KV, OutputT>) transform.getFn(), - (Coder) WindowedValue.getFullCoder( - input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), - input.getWindowingStrategy(), - (TupleTag<KV>) inputTag, - transform.getSideInputs(), - sideInputTagToView.build(), - mainOutputTag, - sideOutputTags); - } else { - executor = new DoFnExecutor<>( - userGraphContext.getStepName(), - description, - userGraphContext.getOptions(), - transform.getFn(), - WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), - input.getWindowingStrategy(), - (TupleTag<InputT>) inputTag, - transform.getSideInputs(), - sideInputTagToView.build(), - mainOutputTag, - sideOutputTags); - } + ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder(); + for (PCollectionView pCollectionView : transform.getSideInputs()) { + sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); + } - context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs())); + DoFnExecutor executor; + DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); + if (signature.stateDeclarations().size() > 0 + || signature.timerDeclarations().size() > 0) { + executor = new StatefulDoFnExecutor<>( + userGraphContext.getStepName(), + description, + userGraphContext.getOptions(), + (DoFn<KV, OutputT>) transform.getFn(), + (Coder) WindowedValue.getFullCoder( + input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), + input.getWindowingStrategy(), + (TupleTag<KV>) inputTag, + transform.getSideInputs(), + sideInputTagToView.build(), + mainOutputTag, + sideOutputTags); + } else { + executor = new DoFnExecutor<>( + userGraphContext.getStepName(), + description, + userGraphContext.getOptions(), + transform.getFn(), + WindowedValue.getFullCoder( + input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), + input.getWindowingStrategy(), + (TupleTag<InputT>) inputTag, + transform.getSideInputs(), + sideInputTagToView.build(), + mainOutputTag, + sideOutputTags); } + + context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs())); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java index 1ef1ec3..c450a22 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java @@ -19,6 +19,6 @@ package org.apache.beam.runners.jstorm.translation.translator; import org.apache.beam.sdk.transforms.Reshuffle; -public class ReshuffleTranslator<K, V> extends TransformTranslator.Default<Reshuffle<K,V>> { - +public class ReshuffleTranslator<K, V> extends TransformTranslator.Default<Reshuffle<K, V>> { + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java index 5b5a8e2..a15a8ba 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java @@ -17,76 +17,79 @@ */ package org.apache.beam.runners.jstorm.translation.translator; -import com.google.auto.value.AutoValue; - -import javax.annotation.Nullable; -import java.util.List; - import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.auto.value.AutoValue; +import java.util.List; +import javax.annotation.Nullable; + /** * Class that defines the stream connection between upstream and downstream components. */ @AutoValue public abstract class Stream { - public abstract Producer getProducer(); - public abstract Consumer getConsumer(); + public abstract Producer getProducer(); - public static Stream of(Producer producer, Consumer consumer) { - return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream( - producer, consumer); + public abstract Consumer getConsumer(); + + public static Stream of(Producer producer, Consumer consumer) { + return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream( + producer, consumer); + } + + @AutoValue + public abstract static class Producer { + public abstract String getComponentId(); + + public abstract String getStreamId(); + + public abstract String getStreamName(); + + public static Producer of(String componentId, String streamId, String streamName) { + return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Producer( + componentId, streamId, streamName); } + } - @AutoValue - public abstract static class Producer { - public abstract String getComponentId(); - public abstract String getStreamId(); - public abstract String getStreamName(); + @AutoValue + public abstract static class Consumer { + public abstract String getComponentId(); - public static Producer of(String componentId, String streamId, String streamName) { - return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Producer( - componentId, streamId, streamName); - } + public abstract Grouping getGrouping(); + + public static Consumer of(String componentId, Grouping grouping) { + return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Consumer( + componentId, grouping); } + } + + @AutoValue + public abstract static class Grouping { + public abstract Type getType(); - @AutoValue - public abstract static class Consumer { - public abstract String getComponentId(); - public abstract Grouping getGrouping(); + @Nullable + public abstract List<String> getFields(); + + public static Grouping of(Type type) { + checkArgument(!Type.FIELDS.equals(type), "Fields grouping should provide key fields."); + return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Grouping( + type, null /* fields */); + } - public static Consumer of(String componentId, Grouping grouping) { - return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Consumer( - componentId, grouping); - } + public static Grouping byFields(List<String> fields) { + checkNotNull(fields, "fields"); + checkArgument(!fields.isEmpty(), "No key fields were provided for field grouping!"); + return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Grouping( + Type.FIELDS, fields); } - @AutoValue - public abstract static class Grouping { - public abstract Type getType(); - - @Nullable - public abstract List<String> getFields(); - - public static Grouping of(Type type) { - checkArgument(!Type.FIELDS.equals(type), "Fields grouping should provide key fields."); - return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Grouping( - type, null /* fields */); - } - - public static Grouping byFields(List<String> fields) { - checkNotNull(fields, "fields"); - checkArgument(!fields.isEmpty(), "No key fields were provided for field grouping!"); - return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Grouping( - Type.FIELDS, fields); - } - - /** - * Types of stream groupings Storm allows - */ - public enum Type { - ALL, CUSTOM, DIRECT, SHUFFLE, LOCAL_OR_SHUFFLE, FIELDS, GLOBAL, NONE - } + /** + * Types of stream groupings Storm allows + */ + public enum Type { + ALL, CUSTOM, DIRECT, SHUFFLE, LOCAL_OR_SHUFFLE, FIELDS, GLOBAL, NONE } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java index bebdf7b..487cac0 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java @@ -20,57 +20,57 @@ package org.apache.beam.runners.jstorm.translation.translator; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.collect.FluentIterable; +import java.util.Map; import org.apache.beam.runners.jstorm.translation.TranslationContext; import org.apache.beam.sdk.transforms.PTransform; - import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; -import java.util.Map; - /** * Interface for classes capable of tranforming Beam PTransforms into Storm primitives. */ public interface TransformTranslator<T extends PTransform<?, ?>> { - void translateNode(T transform, TranslationContext context); + void translateNode(T transform, TranslationContext context); - /** - * Returns true if this translator can translate the given transform. - */ - boolean canTranslate(T transform, TranslationContext context); + /** + * Returns true if this translator can translate the given transform. + */ + boolean canTranslate(T transform, TranslationContext context); - class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> { - @Override - public void translateNode(T1 transform, TranslationContext context) { + class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> { + @Override + public void translateNode(T1 transform, TranslationContext context) { - } + } - @Override - public boolean canTranslate(T1 transform, TranslationContext context) { - return true; - } + @Override + public boolean canTranslate(T1 transform, TranslationContext context) { + return true; + } - static String describeTransform( - PTransform<?, ?> transform, - Map<TupleTag<?>, PValue> inputs, - Map<TupleTag<?>, PValue> outputs) { - return String.format("%s --> %s --> %s", - Joiner.on('+').join(FluentIterable.from(inputs.entrySet()) - .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() { - @Override - public String apply(Map.Entry<TupleTag<?>, PValue> taggedPValue) { - return taggedPValue.getKey().getId(); - // return taggedPValue.getValue().getName(); - }})), - transform.getName(), - Joiner.on('+').join(FluentIterable.from(outputs.entrySet()) - .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() { - @Override - public String apply(Map.Entry<TupleTag<?>, PValue> taggedPvalue) { - return taggedPvalue.getKey().getId(); - //return taggedPValue.getValue().getName(); - }}))); - } + static String describeTransform( + PTransform<?, ?> transform, + Map<TupleTag<?>, PValue> inputs, + Map<TupleTag<?>, PValue> outputs) { + return String.format("%s --> %s --> %s", + Joiner.on('+').join(FluentIterable.from(inputs.entrySet()) + .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() { + @Override + public String apply(Map.Entry<TupleTag<?>, PValue> taggedPValue) { + return taggedPValue.getKey().getId(); + // return taggedPValue.getValue().getName(); + } + })), + transform.getName(), + Joiner.on('+').join(FluentIterable.from(outputs.entrySet()) + .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() { + @Override + public String apply(Map.Entry<TupleTag<?>, PValue> taggedPvalue) { + return taggedPvalue.getKey().getId(); + //return taggedPValue.getValue().getName(); + } + }))); } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java index ac7d7bd..33ac024 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java @@ -17,30 +17,30 @@ */ package org.apache.beam.runners.jstorm.translation.translator; +import org.apache.beam.runners.jstorm.translation.TranslationContext; +import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TaggedPValue; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.runners.jstorm.translation.TranslationContext; -import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout; - /** * Translates a Read.Unbounded into a Storm spout. - * + * * @param <T> */ public class UnboundedSourceTranslator<T> extends TransformTranslator.Default<Read.Unbounded<T>> { - public void translateNode(Read.Unbounded<T> transform, TranslationContext context) { - TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); + public void translateNode(Read.Unbounded<T> transform, TranslationContext context) { + TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + String description = + describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); - TupleTag<?> tag = userGraphContext.getOutputTag(); - PValue output = userGraphContext.getOutput(); + TupleTag<?> tag = userGraphContext.getOutputTag(); + PValue output = userGraphContext.getOutput(); - UnboundedSourceSpout spout = new UnboundedSourceSpout( - description, - transform.getSource(), userGraphContext.getOptions(), tag); - context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output)); - } + UnboundedSourceSpout spout = new UnboundedSourceSpout( + description, + transform.getSource(), userGraphContext.getOptions(), tag); + context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output)); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java index 0ebf837..c55c8d6 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java @@ -17,6 +17,10 @@ */ package org.apache.beam.runners.jstorm.translation.translator; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; import org.apache.beam.runners.jstorm.translation.TranslationContext; import org.apache.beam.runners.jstorm.translation.runtime.ViewExecutor; import org.apache.beam.sdk.coders.Coder; @@ -33,342 +37,342 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PCollectionViews; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - /** * A {@link TransformTranslator} for executing {@link View Views} in JStorm runner. */ -public class ViewTranslator extends TransformTranslator.Default<ViewTranslator.CreateJStormPCollectionView<?, ?>> { - @Override - public void translateNode(CreateJStormPCollectionView<?, ?> transform, TranslationContext context) { - TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); - ViewExecutor viewExecutor = new ViewExecutor(description, userGraphContext.getOutputTag()); - context.addTransformExecutor(viewExecutor); +public class ViewTranslator + extends TransformTranslator.Default<ViewTranslator.CreateJStormPCollectionView<?, ?>> { + @Override + public void translateNode( + CreateJStormPCollectionView<?, ?> transform, TranslationContext context) { + TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + String description = describeTransform( + transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); + ViewExecutor viewExecutor = new ViewExecutor(description, userGraphContext.getOutputTag()); + context.addTransformExecutor(viewExecutor); + } + + /** + * Specialized implementation for + * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap} + * for the Flink runner in streaming mode. + */ + public static class ViewAsMap<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> { + + @SuppressWarnings("unused") // used via reflection in JstormRunner#apply() + public ViewAsMap(View.AsMap<K, V> transform) { } - /** - * Specialized implementation for - * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap} - * for the Flink runner in streaming mode. - */ - public static class ViewAsMap<K, V> - extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> { - - @SuppressWarnings("unused") // used via reflection in JstormRunner#apply() - public ViewAsMap(View.AsMap<K, V> transform) { - } - - @Override - public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) { - PCollectionView<Map<K, V>> view = - PCollectionViews.mapView( - input, - input.getWindowingStrategy(), - input.getCoder()); - - @SuppressWarnings({"rawtypes", "unchecked"}) - KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); - try { - inputCoder.getKeyCoder().verifyDeterministic(); - } catch (Coder.NonDeterministicException e) { - // TODO: log warning as other runners. - } - - return input - .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) - .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, V>>of(view)); - } + @Override + public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) { + PCollectionView<Map<K, V>> view = + PCollectionViews.mapView( + input, + input.getWindowingStrategy(), + input.getCoder()); + + @SuppressWarnings({"rawtypes", "unchecked"}) + KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); + try { + inputCoder.getKeyCoder().verifyDeterministic(); + } catch (Coder.NonDeterministicException e) { + // TODO: log warning as other runners. + } + + return input + .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) + .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, V>>of(view)); + } - @Override - protected String getKindString() { - return "StreamingViewAsMap"; - } + @Override + protected String getKindString() { + return "StreamingViewAsMap"; } + } + + /** + * Specialized expansion for {@link + * View.AsMultimap View.AsMultimap} for the + * Flink runner in streaming mode. + */ + public static class ViewAsMultimap<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> { /** - * Specialized expansion for {@link - * View.AsMultimap View.AsMultimap} for the - * Flink runner in streaming mode. + * Builds an instance of this class from the overridden transform. */ - public static class ViewAsMultimap<K, V> - extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> { - - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in JStormRunner#apply() - public ViewAsMultimap(View.AsMultimap<K, V> transform) { - } - - @Override - public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) { - PCollectionView<Map<K, Iterable<V>>> view = - PCollectionViews.multimapView( - input, - input.getWindowingStrategy(), - input.getCoder()); - - @SuppressWarnings({"rawtypes", "unchecked"}) - KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); - try { - inputCoder.getKeyCoder().verifyDeterministic(); - } catch (Coder.NonDeterministicException e) { - // TODO: log warning as other runners. - } - - return input - .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) - .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view)); - } + @SuppressWarnings("unused") // used via reflection in JStormRunner#apply() + public ViewAsMultimap(View.AsMultimap<K, V> transform) { + } - @Override - protected String getKindString() { - return "StreamingViewAsMultimap"; - } + @Override + public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) { + PCollectionView<Map<K, Iterable<V>>> view = + PCollectionViews.multimapView( + input, + input.getWindowingStrategy(), + input.getCoder()); + + @SuppressWarnings({"rawtypes", "unchecked"}) + KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); + try { + inputCoder.getKeyCoder().verifyDeterministic(); + } catch (Coder.NonDeterministicException e) { + // TODO: log warning as other runners. + } + + return input + .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) + .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view)); } + @Override + protected String getKindString() { + return "StreamingViewAsMultimap"; + } + } + + /** + * Specialized implementation for + * {@link View.AsList View.AsList} for the + * JStorm runner in streaming mode. + */ + public static class ViewAsList<T> + extends PTransform<PCollection<T>, PCollectionView<List<T>>> { /** - * Specialized implementation for - * {@link View.AsList View.AsList} for the - * JStorm runner in streaming mode. + * Builds an instance of this class from the overridden transform. */ - public static class ViewAsList<T> - extends PTransform<PCollection<T>, PCollectionView<List<T>>> { - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in JStormRunner#apply() - public ViewAsList(View.AsList<T> transform) {} - - @Override - public PCollectionView<List<T>> expand(PCollection<T> input) { - PCollectionView<List<T>> view = - PCollectionViews.listView( - input, - input.getWindowingStrategy(), - input.getCoder()); - - return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) - .apply(CreateJStormPCollectionView.<T, List<T>>of(view)); - } + @SuppressWarnings("unused") // used via reflection in JStormRunner#apply() + public ViewAsList(View.AsList<T> transform) { + } - @Override - protected String getKindString() { - return "StreamingViewAsList"; - } + @Override + public PCollectionView<List<T>> expand(PCollection<T> input) { + PCollectionView<List<T>> view = + PCollectionViews.listView( + input, + input.getWindowingStrategy(), + input.getCoder()); + + return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) + .apply(CreateJStormPCollectionView.<T, List<T>>of(view)); } + @Override + protected String getKindString() { + return "StreamingViewAsList"; + } + } + + /** + * Specialized implementation for + * {@link View.AsIterable View.AsIterable} for the + * JStorm runner in streaming mode. + */ + public static class ViewAsIterable<T> + extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> { /** - * Specialized implementation for - * {@link View.AsIterable View.AsIterable} for the - * JStorm runner in streaming mode. + * Builds an instance of this class from the overridden transform. */ - public static class ViewAsIterable<T> - extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> { - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in JStormRunner#apply() - public ViewAsIterable(View.AsIterable<T> transform) { } - - @Override - public PCollectionView<Iterable<T>> expand(PCollection<T> input) { - PCollectionView<Iterable<T>> view = - PCollectionViews.iterableView( - input, - input.getWindowingStrategy(), - input.getCoder()); - - return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) - .apply(CreateJStormPCollectionView.<T, Iterable<T>>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingViewAsIterable"; - } + @SuppressWarnings("unused") // used via reflection in JStormRunner#apply() + public ViewAsIterable(View.AsIterable<T> transform) { } - /** - * Specialized expansion for - * {@link View.AsSingleton View.AsSingleton} for the - * JStorm runner in streaming mode. - */ - public static class ViewAsSingleton<T> - extends PTransform<PCollection<T>, PCollectionView<T>> { - private View.AsSingleton<T> transform; - - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in JStormRunner#apply() - public ViewAsSingleton(View.AsSingleton<T> transform) { - this.transform = transform; - } + @Override + public PCollectionView<Iterable<T>> expand(PCollection<T> input) { + PCollectionView<Iterable<T>> view = + PCollectionViews.iterableView( + input, + input.getWindowingStrategy(), + input.getCoder()); + + return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) + .apply(CreateJStormPCollectionView.<T, Iterable<T>>of(view)); + } - @Override - public PCollectionView<T> expand(PCollection<T> input) { - Combine.Globally<T, T> combine = Combine.globally( - new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue())); - if (!transform.hasDefaultValue()) { - combine = combine.withoutDefaults(); - } - return input.apply(combine.asSingletonView()); - } + @Override + protected String getKindString() { + return "StreamingViewAsIterable"; + } + } - @Override - protected String getKindString() { - return "StreamingViewAsSingleton"; - } + /** + * Specialized expansion for + * {@link View.AsSingleton View.AsSingleton} for the + * JStorm runner in streaming mode. + */ + public static class ViewAsSingleton<T> + extends PTransform<PCollection<T>, PCollectionView<T>> { + private View.AsSingleton<T> transform; - private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> { - private boolean hasDefaultValue; - private T defaultValue; - - SingletonCombine(boolean hasDefaultValue, T defaultValue) { - this.hasDefaultValue = hasDefaultValue; - this.defaultValue = defaultValue; - } - - @Override - public T apply(T left, T right) { - throw new IllegalArgumentException("PCollection with more than one element " - + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to " - + "combine the PCollection into a single value"); - } - - @Override - public T identity() { - if (hasDefaultValue) { - return defaultValue; - } else { - throw new IllegalArgumentException( - "Empty PCollection accessed as a singleton view. " - + "Consider setting withDefault to provide a default value"); - } - } - } + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in JStormRunner#apply() + public ViewAsSingleton(View.AsSingleton<T> transform) { + this.transform = transform; } - public static class CombineGloballyAsSingletonView<InputT, OutputT> - extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> { - Combine.GloballyAsSingletonView<InputT, OutputT> transform; - - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() - public CombineGloballyAsSingletonView( - Combine.GloballyAsSingletonView<InputT, OutputT> transform) { - this.transform = transform; - } - - @Override - public PCollectionView<OutputT> expand(PCollection<InputT> input) { - PCollection<OutputT> combined = - input.apply(Combine.globally(transform.getCombineFn()) - .withoutDefaults() - .withFanout(transform.getFanout())); - - PCollectionView<OutputT> view = PCollectionViews.singletonView( - combined, - combined.getWindowingStrategy(), - transform.getInsertDefault(), - transform.getInsertDefault() - ? transform.getCombineFn().defaultValue() : null, - combined.getCoder()); - return combined - .apply(ParDo.of(new WrapAsList<OutputT>())) - .apply(CreateJStormPCollectionView.<OutputT, OutputT>of(view)); - } + @Override + public PCollectionView<T> expand(PCollection<T> input) { + Combine.Globally<T, T> combine = Combine.globally( + new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue())); + if (!transform.hasDefaultValue()) { + combine = combine.withoutDefaults(); + } + return input.apply(combine.asSingletonView()); + } - @Override - protected String getKindString() { - return "StreamingCombineGloballyAsSingletonView"; - } + @Override + protected String getKindString() { + return "StreamingViewAsSingleton"; } - private static class WrapAsList<T> extends DoFn<T, List<T>> { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(Collections.singletonList(c.element())); + private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> { + private boolean hasDefaultValue; + private T defaultValue; + + SingletonCombine(boolean hasDefaultValue, T defaultValue) { + this.hasDefaultValue = hasDefaultValue; + this.defaultValue = defaultValue; + } + + @Override + public T apply(T left, T right) { + throw new IllegalArgumentException("PCollection with more than one element " + + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to " + + "combine the PCollection into a single value"); + } + + @Override + public T identity() { + if (hasDefaultValue) { + return defaultValue; + } else { + throw new IllegalArgumentException( + "Empty PCollection accessed as a singleton view. " + + "Consider setting withDefault to provide a default value"); } + } } + } + + public static class CombineGloballyAsSingletonView<InputT, OutputT> + extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> { + Combine.GloballyAsSingletonView<InputT, OutputT> transform; /** - * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs. - * They require the input {@link PCollection} fits in memory. - * For a large {@link PCollection} this is expected to crash! - * - * @param <T> the type of elements to concatenate. + * Builds an instance of this class from the overridden transform. */ - private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> { - private static final long serialVersionUID = 1L; + @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() + public CombineGloballyAsSingletonView( + Combine.GloballyAsSingletonView<InputT, OutputT> transform) { + this.transform = transform; + } - @Override - public List<T> createAccumulator() { - return new ArrayList<>(); - } + @Override + public PCollectionView<OutputT> expand(PCollection<InputT> input) { + PCollection<OutputT> combined = + input.apply(Combine.globally(transform.getCombineFn()) + .withoutDefaults() + .withFanout(transform.getFanout())); + + PCollectionView<OutputT> view = PCollectionViews.singletonView( + combined, + combined.getWindowingStrategy(), + transform.getInsertDefault(), + transform.getInsertDefault() + ? transform.getCombineFn().defaultValue() : null, + combined.getCoder()); + return combined + .apply(ParDo.of(new WrapAsList<OutputT>())) + .apply(CreateJStormPCollectionView.<OutputT, OutputT>of(view)); + } - @Override - public List<T> addInput(List<T> accumulator, T input) { - accumulator.add(input); - return accumulator; - } + @Override + protected String getKindString() { + return "StreamingCombineGloballyAsSingletonView"; + } + } - @Override - public List<T> mergeAccumulators(Iterable<List<T>> accumulators) { - List<T> result = createAccumulator(); - for (List<T> accumulator : accumulators) { - result.addAll(accumulator); - } - return result; - } + private static class WrapAsList<T> extends DoFn<T, List<T>> { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(Collections.singletonList(c.element())); + } + } + + /** + * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs. + * They require the input {@link PCollection} fits in memory. + * For a large {@link PCollection} this is expected to crash! + * + * @param <T> the type of elements to concatenate. + */ + private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> { + private static final long serialVersionUID = 1L; - @Override - public List<T> extractOutput(List<T> accumulator) { - return accumulator; - } + @Override + public List<T> createAccumulator() { + return new ArrayList<>(); + } - @Override - public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) { - return ListCoder.of(inputCoder); - } + @Override + public List<T> addInput(List<T> accumulator, T input) { + accumulator.add(input); + return accumulator; + } - @Override - public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) { - return ListCoder.of(inputCoder); - } + @Override + public List<T> mergeAccumulators(Iterable<List<T>> accumulators) { + List<T> result = createAccumulator(); + for (List<T> accumulator : accumulators) { + result.addAll(accumulator); + } + return result; } - /** - * Creates a primitive {@link PCollectionView}. - * - * <p>For internal use only by runner implementors. - * - * @param <ElemT> The type of the elements of the input PCollection - * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input - */ - public static class CreateJStormPCollectionView<ElemT, ViewT> - extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> { - private PCollectionView<ViewT> view; + @Override + public List<T> extractOutput(List<T> accumulator) { + return accumulator; + } - private CreateJStormPCollectionView(PCollectionView<ViewT> view) { - this.view = view; - } + @Override + public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) { + return ListCoder.of(inputCoder); + } - public static <ElemT, ViewT> CreateJStormPCollectionView<ElemT, ViewT> of( - PCollectionView<ViewT> view) { - return new CreateJStormPCollectionView<>(view); - } + @Override + public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) { + return ListCoder.of(inputCoder); + } + } + + /** + * Creates a primitive {@link PCollectionView}. + * <p> + * <p>For internal use only by runner implementors. + * + * @param <ElemT> The type of the elements of the input PCollection + * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input + */ + public static class CreateJStormPCollectionView<ElemT, ViewT> + extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> { + private PCollectionView<ViewT> view; + + private CreateJStormPCollectionView(PCollectionView<ViewT> view) { + this.view = view; + } - @Override - public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) { - return view; - } + public static <ElemT, ViewT> CreateJStormPCollectionView<ElemT, ViewT> of( + PCollectionView<ViewT> view) { + return new CreateJStormPCollectionView<>(view); + } + + @Override + public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) { + return view; } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java index 0bf9a49..6de34dd 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java @@ -17,22 +17,22 @@ */ package org.apache.beam.runners.jstorm.translation.translator; +import org.apache.beam.runners.jstorm.translation.TranslationContext; import org.apache.beam.runners.jstorm.translation.runtime.WindowAssignExecutor; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.runners.jstorm.translation.TranslationContext; - public class WindowAssignTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> { - @Override - public void translateNode(Window.Assign<T> transform, TranslationContext context) { - TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); - context.getUserGraphContext().setWindowed(); - WindowAssignExecutor executor = new WindowAssignExecutor( - description, - transform.getWindowFn(), - userGraphContext.getOutputTag()); - context.addTransformExecutor(executor); - } + @Override + public void translateNode(Window.Assign<T> transform, TranslationContext context) { + TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + String description = + describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); + context.getUserGraphContext().setWindowed(); + WindowAssignExecutor executor = new WindowAssignExecutor( + description, + transform.getWindowFn(), + userGraphContext.getOutputTag()); + context.addTransformExecutor(executor); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java index b67aff9..c863c9e 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java @@ -21,27 +21,27 @@ import org.apache.beam.runners.jstorm.translation.TranslationContext; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.Window; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Translates a Window.Bound node into a Storm WindowedBolt - * + * * @param <T> */ public class WindowBoundTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> { - private static final Logger LOG = LoggerFactory.getLogger(WindowBoundTranslator.class); + private static final Logger LOG = LoggerFactory.getLogger(WindowBoundTranslator.class); - // Do nothing here currently. The assign of window strategy is included in AssignTranslator. - @Override - public void translateNode(Window.Assign<T> transform, TranslationContext context) { - if (transform.getWindowFn() instanceof FixedWindows) { - context.getUserGraphContext().setWindowed(); - } else if (transform.getWindowFn() instanceof SlidingWindows) { - context.getUserGraphContext().setWindowed(); - } else { - throw new UnsupportedOperationException("Not supported window type currently: " + transform.getWindowFn()); - } + // Do nothing here currently. The assign of window strategy is included in AssignTranslator. + @Override + public void translateNode(Window.Assign<T> transform, TranslationContext context) { + if (transform.getWindowFn() instanceof FixedWindows) { + context.getUserGraphContext().setWindowed(); + } else if (transform.getWindowFn() instanceof SlidingWindows) { + context.getUserGraphContext().setWindowed(); + } else { + throw new UnsupportedOperationException( + "Not supported window type currently: " + transform.getWindowFn()); } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java index 07a3ad5..596d8b4 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java @@ -18,8 +18,8 @@ package org.apache.beam.runners.jstorm.translation.util; public class CommonInstance { - public static final String KEY = "Key"; - public static final String VALUE = "Value"; + public static final String KEY = "Key"; + public static final String VALUE = "Value"; - public static final String BEAM_WATERMARK_STREAM_ID = "BEAM_WATERMARK"; + public static final String BEAM_WATERMARK_STREAM_ID = "BEAM_WATERMARK"; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java index 87562fd..750095e 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java @@ -17,30 +17,29 @@ */ package org.apache.beam.runners.jstorm.translation.util; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import java.io.Serializable; +import javax.annotation.Nullable; import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollectionView; -import javax.annotation.Nullable; -import java.io.Serializable; - /** * No-op SideInputReader implementation. */ public class DefaultSideInputReader implements SideInputReader, Serializable { - @Nullable - @Override - public <T> T get(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow) { - return null; - } + @Nullable + @Override + public <T> T get(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow) { + return null; + } - @Override - public <T> boolean contains(PCollectionView<T> pCollectionView) { - return false; - } + @Override + public <T> boolean contains(PCollectionView<T> pCollectionView) { + return false; + } - @Override - public boolean isEmpty() { - return true; - } + @Override + public boolean isEmpty() { + return true; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java index 481b7fb..4eb1d8f 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java @@ -17,73 +17,74 @@ */ package org.apache.beam.runners.jstorm.translation.util; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; import org.apache.beam.runners.core.ExecutionContext; -import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; -import java.io.IOException; - -import static com.google.common.base.Preconditions.checkNotNull; - /** * Default StepContext for running DoFn This does not allow accessing state or timer internals. */ public class DefaultStepContext implements ExecutionContext.StepContext { - private TimerInternals timerInternals; + private TimerInternals timerInternals; - private StateInternals stateInternals; + private StateInternals stateInternals; - public DefaultStepContext(TimerInternals timerInternals, StateInternals stateInternals) { - this.timerInternals = checkNotNull(timerInternals, "timerInternals"); - this.stateInternals = checkNotNull(stateInternals, "stateInternals"); - } + public DefaultStepContext(TimerInternals timerInternals, StateInternals stateInternals) { + this.timerInternals = checkNotNull(timerInternals, "timerInternals"); + this.stateInternals = checkNotNull(stateInternals, "stateInternals"); + } - @Override - public String getStepName() { - return null; - } + @Override + public String getStepName() { + return null; + } - @Override - public String getTransformName() { - return null; - } + @Override + public String getTransformName() { + return null; + } - @Override - public void noteOutput(WindowedValue<?> windowedValue) { + @Override + public void noteOutput(WindowedValue<?> windowedValue) { - } + } - @Override - public void noteOutput(TupleTag<?> tupleTag, WindowedValue<?> windowedValue) { + @Override + public void noteOutput(TupleTag<?> tupleTag, WindowedValue<?> windowedValue) { - } + } - @Override - public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, - Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder) throws IOException { - throw new UnsupportedOperationException("Writing side-input data is not supported."); - } + @Override + public <T, W extends BoundedWindow> void writePCollectionViewData( + TupleTag<?> tag, Iterable<WindowedValue<T>> data, + Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder) + throws IOException { + throw new UnsupportedOperationException("Writing side-input data is not supported."); + } - @Override - public StateInternals stateInternals() { - return stateInternals; - } + @Override + public StateInternals stateInternals() { + return stateInternals; + } - @Override - public TimerInternals timerInternals() { - return timerInternals; - } + @Override + public TimerInternals timerInternals() { + return timerInternals; + } - public void setStateInternals(StateInternals stateInternals) { - this.stateInternals = stateInternals; - } + public void setStateInternals(StateInternals stateInternals) { + this.stateInternals = stateInternals; + } - public void setTimerInternals(TimerInternals timerInternals) { - this.timerInternals = timerInternals; - } + public void setTimerInternals(TimerInternals timerInternals) { + this.timerInternals = timerInternals; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java index cbf815a..9fd62e4 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java @@ -17,37 +17,37 @@ */ package org.apache.beam.runners.jstorm.util; +import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.jstorm.translation.runtime.Executor; - import org.apache.beam.runners.jstorm.translation.runtime.GroupByWindowExecutor; import org.apache.beam.runners.jstorm.translation.runtime.MultiStatefulDoFnExecutor; import org.apache.beam.runners.jstorm.translation.runtime.StatefulDoFnExecutor; -import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; public class RunnerUtils { - /** - * Convert WindowedValue<KV<>> into KeyedWorkItem<K, WindowedValue<V>> - * @param elem - * @return - */ - public static <K, V> KeyedWorkItem<K, V> toKeyedWorkItem(WindowedValue<KV<K, V>> elem) { - WindowedValue<KV<K, V>> kvElem = (WindowedValue<KV<K, V>>) elem; - SingletonKeyedWorkItem<K, V> workItem = SingletonKeyedWorkItem.of( - kvElem.getValue().getKey(), - kvElem.withValue(kvElem.getValue().getValue())); - return workItem; - } + /** + * Convert WindowedValue<KV<>> into KeyedWorkItem<K, WindowedValue<V>> + * + * @param elem + * @return + */ + public static <K, V> KeyedWorkItem<K, V> toKeyedWorkItem(WindowedValue<KV<K, V>> elem) { + WindowedValue<KV<K, V>> kvElem = (WindowedValue<KV<K, V>>) elem; + SingletonKeyedWorkItem<K, V> workItem = SingletonKeyedWorkItem.of( + kvElem.getValue().getKey(), + kvElem.withValue(kvElem.getValue().getValue())); + return workItem; + } - public static boolean isGroupByKeyExecutor (Executor executor) { - if (executor instanceof GroupByWindowExecutor) { - return true; - } else if (executor instanceof StatefulDoFnExecutor || - executor instanceof MultiStatefulDoFnExecutor) { - return true; - } else { - return false; - } + public static boolean isGroupByKeyExecutor(Executor executor) { + if (executor instanceof GroupByWindowExecutor) { + return true; + } else if (executor instanceof StatefulDoFnExecutor || + executor instanceof MultiStatefulDoFnExecutor) { + return true; + } else { + return false; } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java index 391699b..182794f 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java @@ -18,47 +18,48 @@ package org.apache.beam.runners.jstorm.util; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.beam.sdk.options.PipelineOptions; +import static com.google.common.base.Preconditions.checkNotNull; +import com.fasterxml.jackson.databind.ObjectMapper; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.Serializable; - -import static com.google.common.base.Preconditions.checkNotNull; +import org.apache.beam.sdk.options.PipelineOptions; /** * Encapsulates the PipelineOptions in serialized form to ship them to the cluster. */ public class SerializedPipelineOptions implements Serializable { - private final byte[] serializedOptions; + private final byte[] serializedOptions; - /** Lazily initialized copy of deserialized options */ - private transient PipelineOptions pipelineOptions; + /** + * Lazily initialized copy of deserialized options + */ + private transient PipelineOptions pipelineOptions; - public SerializedPipelineOptions(PipelineOptions options) { - checkNotNull(options, "PipelineOptions must not be null."); - - try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - new ObjectMapper().writeValue(baos, options); - this.serializedOptions = baos.toByteArray(); - } catch (Exception e) { - throw new RuntimeException("Couldn't serialize PipelineOptions.", e); - } + public SerializedPipelineOptions(PipelineOptions options) { + checkNotNull(options, "PipelineOptions must not be null."); + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + new ObjectMapper().writeValue(baos, options); + this.serializedOptions = baos.toByteArray(); + } catch (Exception e) { + throw new RuntimeException("Couldn't serialize PipelineOptions.", e); } - public PipelineOptions getPipelineOptions() { - if (pipelineOptions == null) { - try { - pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class); - } catch (IOException e) { - throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e); - } - } + } - return pipelineOptions; + public PipelineOptions getPipelineOptions() { + if (pipelineOptions == null) { + try { + pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class); + } catch (IOException e) { + throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e); + } } + return pipelineOptions; + } + } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java index dee5f1a..cce21b3 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.util.WindowedValue; /** * Singleton keyed word item. + * * @param <K> * @param <ElemT> */ @@ -38,7 +39,7 @@ public class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> } public static <K, ElemT> SingletonKeyedWorkItem<K, ElemT> of(K key, WindowedValue<ElemT> value) { - return new SingletonKeyedWorkItem<K, ElemT>(key, value); + return new SingletonKeyedWorkItem<K, ElemT>(key, value); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrarTest.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrarTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrarTest.java index 344d3c7..0d6fc23 100644 --- a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrarTest.java +++ b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrarTest.java @@ -34,7 +34,7 @@ public class JStormRunnerRegistrarTest { @Test public void testFullName() { String[] args = - new String[] {String.format("--runner=%s", JStormRunner.class.getName())}; + new String[]{String.format("--runner=%s", JStormRunner.class.getName())}; PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create(); assertEquals(opts.getRunner(), JStormRunner.class); } @@ -42,7 +42,7 @@ public class JStormRunnerRegistrarTest { @Test public void testClassName() { String[] args = - new String[] {String.format("--runner=%s", JStormRunner.class.getSimpleName())}; + new String[]{String.format("--runner=%s", JStormRunner.class.getSimpleName())}; PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create(); assertEquals(opts.getRunner(), JStormRunner.class); }
