http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java index d907fac..6d6f1c6 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java @@ -17,15 +17,17 @@ */ package org.apache.beam.runners.jstorm.translation; -import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import java.util.List; import org.apache.beam.runners.core.construction.PTransformMatchers; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; +import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator; +import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformOverride; import org.apache.beam.sdk.runners.TransformHierarchy; -import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.View; @@ -34,144 +36,151 @@ import org.apache.beam.sdk.values.PValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator; - -import java.util.List; - /** * Pipleline translator of Storm */ public class StormPipelineTranslator extends Pipeline.PipelineVisitor.Defaults { - private static final Logger LOG = LoggerFactory.getLogger(StormPipelineTranslator.class); - private TranslationContext context; - private int depth = 0; - - public StormPipelineTranslator(TranslationContext context) { - this.context = context; - } - - public void translate(Pipeline pipeline) { - List<PTransformOverride> transformOverrides = - ImmutableList.<PTransformOverride>builder() - .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsIterable.class), - new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsIterable.class))) - .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsList.class), - new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsList.class))) - .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMap.class), - new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMap.class))) - .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMultimap.class), - new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMultimap.class))) - .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsSingleton.class), - new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsSingleton.class))) - .add(PTransformOverride.of(PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class), - new ReflectiveOneToOneOverrideFactory((ViewTranslator.CombineGloballyAsSingletonView.class)))) - .build(); - pipeline.replaceAll(transformOverrides); - pipeline.traverseTopologically(this); - } - - @Override - public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { - LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + node); - this.depth++; - - // check if current composite transforms need to be translated. - // If not, all sub transforms will be translated in visitPrimitiveTransform. - PTransform<?, ?> transform = node.getTransform(); - if (transform != null) { - TransformTranslator translator = TranslatorRegistry.getTranslator(transform); - - if (translator != null && applyCanTranslate(transform, node, translator)) { - applyStreamingTransform(transform, node, translator); - LOG.info(genSpaces(this.depth) + "translated-" + node); - return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; - } - } - return CompositeBehavior.ENTER_TRANSFORM; - } - - public void leaveCompositeTransform(TransformHierarchy.Node node) { - this.depth--; - LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + node); + private static final Logger LOG = LoggerFactory.getLogger(StormPipelineTranslator.class); + private TranslationContext context; + private int depth = 0; + + public StormPipelineTranslator(TranslationContext context) { + this.context = context; + } + + public void translate(Pipeline pipeline) { + List<PTransformOverride> transformOverrides = + ImmutableList.<PTransformOverride>builder() + .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsIterable.class), + new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsIterable.class))) + .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsList.class), + new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsList.class))) + .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMap.class), + new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMap.class))) + .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMultimap.class), + new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMultimap.class))) + .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsSingleton.class), + new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsSingleton.class))) + .add(PTransformOverride.of( + PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class), + new ReflectiveOneToOneOverrideFactory( + (ViewTranslator.CombineGloballyAsSingletonView.class)))) + .build(); + pipeline.replaceAll(transformOverrides); + pipeline.traverseTopologically(this); + } + + @Override + public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { + LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + node); + this.depth++; + + // check if current composite transforms need to be translated. + // If not, all sub transforms will be translated in visitPrimitiveTransform. + PTransform<?, ?> transform = node.getTransform(); + if (transform != null) { + TransformTranslator translator = TranslatorRegistry.getTranslator(transform); + + if (translator != null && applyCanTranslate(transform, node, translator)) { + applyStreamingTransform(transform, node, translator); + LOG.info(genSpaces(this.depth) + "translated-" + node); + return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; + } } - - public void visitPrimitiveTransform(TransformHierarchy.Node node) { - LOG.info(genSpaces(this.depth) + "visitPrimitiveTransform- " + node); - - if (!node.isRootNode()) { - PTransform<?, ?> transform = node.getTransform(); - TransformTranslator translator = TranslatorRegistry.getTranslator(transform); - if (translator == null || !applyCanTranslate(transform, node, translator)) { - LOG.info(node.getTransform().getClass().toString()); - throw new UnsupportedOperationException("The transform " + transform + " is currently not supported."); - } - applyStreamingTransform(transform, node, translator); - } + return CompositeBehavior.ENTER_TRANSFORM; + } + + public void leaveCompositeTransform(TransformHierarchy.Node node) { + this.depth--; + LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + node); + } + + public void visitPrimitiveTransform(TransformHierarchy.Node node) { + LOG.info(genSpaces(this.depth) + "visitPrimitiveTransform- " + node); + + if (!node.isRootNode()) { + PTransform<?, ?> transform = node.getTransform(); + TransformTranslator translator = TranslatorRegistry.getTranslator(transform); + if (translator == null || !applyCanTranslate(transform, node, translator)) { + LOG.info(node.getTransform().getClass().toString()); + throw new UnsupportedOperationException( + "The transform " + transform + " is currently not supported."); + } + applyStreamingTransform(transform, node, translator); } - - public void visitValue(PValue value, TransformHierarchy.Node node) { - LOG.info(genSpaces(this.depth) + "visiting value {}", value); + } + + public void visitValue(PValue value, TransformHierarchy.Node node) { + LOG.info(genSpaces(this.depth) + "visiting value {}", value); + } + + private <T extends PTransform<?, ?>> void applyStreamingTransform( + PTransform<?, ?> transform, + TransformHierarchy.Node node, + TransformTranslator<?> translator) { + @SuppressWarnings("unchecked") + T typedTransform = (T) transform; + @SuppressWarnings("unchecked") + TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator; + + context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform()); + typedTranslator.translateNode(typedTransform, context); + + // Maintain PValue to TupleTag map for side inputs translation. + context.getUserGraphContext().recordOutputTaggedPValue(); + } + + private <T extends PTransform<?, ?>> boolean applyCanTranslate( + PTransform<?, ?> transform, + TransformHierarchy.Node node, + TransformTranslator<?> translator) { + @SuppressWarnings("unchecked") + T typedTransform = (T) transform; + @SuppressWarnings("unchecked") + TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator; + + context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform()); + + return typedTranslator.canTranslate(typedTransform, context); + } + + /** + * Utility formatting method. + * + * @param n number of spaces to generate + * @return String with "|" followed by n spaces + */ + protected static String genSpaces(int n) { + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < n; i++) { + builder.append("| "); } - - private <T extends PTransform<?, ?>> void applyStreamingTransform(PTransform<?, ?> transform, TransformHierarchy.Node node, - TransformTranslator<?> translator) { - @SuppressWarnings("unchecked") - T typedTransform = (T) transform; - @SuppressWarnings("unchecked") - TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator; - - context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform()); - typedTranslator.translateNode(typedTransform, context); - - // Maintain PValue to TupleTag map for side inputs translation. - context.getUserGraphContext().recordOutputTaggedPValue(); + return builder.toString(); + } + + private static class ReflectiveOneToOneOverrideFactory< + InputT extends PValue, + OutputT extends PValue, + TransformT extends PTransform<InputT, OutputT>> + extends SingleInputOutputOverrideFactory<InputT, OutputT, TransformT> { + private final Class<PTransform<InputT, OutputT>> replacement; + + private ReflectiveOneToOneOverrideFactory( + Class<PTransform<InputT, OutputT>> replacement) { + this.replacement = replacement; } - private <T extends PTransform<?, ?>> boolean applyCanTranslate(PTransform<?, ?> transform, TransformHierarchy.Node node, TransformTranslator<?> translator) { - @SuppressWarnings("unchecked") - T typedTransform = (T) transform; - @SuppressWarnings("unchecked") - TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator; - - context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform()); - - return typedTranslator.canTranslate(typedTransform, context); - } - - /** - * Utility formatting method. - * - * @param n number of spaces to generate - * @return String with "|" followed by n spaces - */ - protected static String genSpaces(int n) { - StringBuilder builder = new StringBuilder(); - for (int i = 0; i < n; i++) { - builder.append("| "); - } - return builder.toString(); - } - - private static class ReflectiveOneToOneOverrideFactory< - InputT extends PValue, - OutputT extends PValue, - TransformT extends PTransform<InputT, OutputT>> - extends SingleInputOutputOverrideFactory<InputT, OutputT, TransformT> { - private final Class<PTransform<InputT, OutputT>> replacement; - - private ReflectiveOneToOneOverrideFactory( - Class<PTransform<InputT, OutputT>> replacement) { - this.replacement = replacement; - } - - @Override - public PTransformReplacement<InputT, OutputT> getReplacementTransform(AppliedPTransform<InputT, OutputT, TransformT> appliedPTransform) { - PTransform<InputT, OutputT> originalPTransform = appliedPTransform.getTransform(); - PTransform<InputT, OutputT> replacedPTransform = InstanceBuilder.ofType(replacement) - .withArg((Class<PTransform<InputT, OutputT>>) originalPTransform.getClass(), originalPTransform) - .build(); - InputT inputT = (InputT) Iterables.getOnlyElement(appliedPTransform.getInputs().values()); - return PTransformReplacement.of(inputT, replacedPTransform); - } + @Override + public PTransformReplacement<InputT, OutputT> getReplacementTransform( + AppliedPTransform<InputT, OutputT, TransformT> appliedPTransform) { + PTransform<InputT, OutputT> originalPTransform = appliedPTransform.getTransform(); + PTransform<InputT, OutputT> replacedPTransform = InstanceBuilder.ofType(replacement) + .withArg( + (Class<PTransform<InputT, OutputT>>) originalPTransform.getClass(), + originalPTransform) + .build(); + InputT inputT = (InputT) Iterables.getOnlyElement(appliedPTransform.getInputs().values()); + return PTransformReplacement.of(inputT, replacedPTransform); } + } }
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java index 707202b..526352a 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java @@ -17,19 +17,29 @@ */ package org.apache.beam.runners.jstorm.translation; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + import avro.shaded.com.google.common.collect.Lists; -import org.apache.beam.runners.jstorm.JStormPipelineOptions; -import org.apache.beam.runners.jstorm.translation.translator.Stream; -import org.apache.beam.runners.jstorm.util.RunnerUtils; -import com.google.common.base.Strings; import com.google.common.base.Function; import com.google.common.base.Joiner; +import com.google.common.base.Strings; import com.google.common.collect.FluentIterable; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.jstorm.JStormPipelineOptions; import org.apache.beam.runners.jstorm.translation.runtime.AdaptorBasicSpout; import org.apache.beam.runners.jstorm.translation.runtime.Executor; +import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt; +import org.apache.beam.runners.jstorm.translation.translator.Stream; import org.apache.beam.runners.jstorm.translation.util.CommonInstance; +import org.apache.beam.runners.jstorm.util.RunnerUtils; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.PValueBase; @@ -38,387 +48,392 @@ import org.apache.beam.sdk.values.TupleTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt; - -import java.util.*; - -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - /** * Maintains the state necessary during Pipeline translation to build a Storm topology. */ public class TranslationContext { - private static final Logger LOG = LoggerFactory.getLogger(TranslationContext.class); - - private final UserGraphContext userGraphContext; - private final ExecutionGraphContext executionGraphContext; - - public TranslationContext(JStormPipelineOptions options) { - this.userGraphContext = new UserGraphContext(options); - this.executionGraphContext = new ExecutionGraphContext(); + private static final Logger LOG = LoggerFactory.getLogger(TranslationContext.class); + + private final UserGraphContext userGraphContext; + private final ExecutionGraphContext executionGraphContext; + + public TranslationContext(JStormPipelineOptions options) { + this.userGraphContext = new UserGraphContext(options); + this.executionGraphContext = new ExecutionGraphContext(); + } + + public ExecutionGraphContext getExecutionGraphContext() { + return executionGraphContext; + } + + public UserGraphContext getUserGraphContext() { + return userGraphContext; + } + + private void addStormStreamDef( + TaggedPValue input, String destComponentName, Stream.Grouping grouping) { + Stream.Producer producer = executionGraphContext.getProducer(input.getValue()); + if (!producer.getComponentId().equals(destComponentName)) { + Stream.Consumer consumer = Stream.Consumer.of(destComponentName, grouping); + executionGraphContext.registerStreamConsumer(consumer, producer); + + ExecutorsBolt executorsBolt = executionGraphContext.getBolt(producer.getComponentId()); + if (executorsBolt != null) { + executorsBolt.addExternalOutputTag(input.getTag()); + } } - - public ExecutionGraphContext getExecutionGraphContext() { - return executionGraphContext; + } + + private String getUpstreamExecutorsBolt() { + for (PValue value : userGraphContext.getInputs().values()) { + String componentId = executionGraphContext.getProducerComponentId(value); + if (componentId != null && executionGraphContext.getBolt(componentId) != null) { + return componentId; + } } - - public UserGraphContext getUserGraphContext() { - return userGraphContext; + // When upstream component is spout, "null" will be return. + return null; + } + + /** + * check if the current transform is applied to source collection. + * + * @return + */ + private boolean connectedToSource() { + for (PValue value : userGraphContext.getInputs().values()) { + if (executionGraphContext.producedBySpout(value)) { + return true; + } + } + return false; + } + + /** + * @param upstreamExecutorsBolt + * @return true if there is multiple input streams, or upstream executor output the same stream + * to different executors + */ + private boolean isMultipleInputOrOutput( + ExecutorsBolt upstreamExecutorsBolt, Map<TupleTag<?>, PValue> inputs) { + if (inputs.size() > 1) { + return true; + } else { + final Sets.SetView<TupleTag> intersection = + Sets.intersection(upstreamExecutorsBolt.getExecutors().keySet(), inputs.keySet()); + if (!intersection.isEmpty()) { + // there is already a different executor consume the same input + return true; + } else { + return false; + } } + } - private void addStormStreamDef(TaggedPValue input, String destComponentName, Stream.Grouping grouping) { - Stream.Producer producer = executionGraphContext.getProducer(input.getValue()); - if (!producer.getComponentId().equals(destComponentName)) { - Stream.Consumer consumer = Stream.Consumer.of(destComponentName, grouping); - executionGraphContext.registerStreamConsumer(consumer, producer); + public void addTransformExecutor(Executor executor) { + addTransformExecutor(executor, Collections.EMPTY_LIST); + } - ExecutorsBolt executorsBolt = executionGraphContext.getBolt(producer.getComponentId()); - if (executorsBolt != null) { - executorsBolt.addExternalOutputTag(input.getTag()); - } - } - } + public void addTransformExecutor(Executor executor, List<PValue> sideInputs) { + addTransformExecutor( + executor, userGraphContext.getInputs(), userGraphContext.getOutputs(), sideInputs); + } - private String getUpstreamExecutorsBolt() { - for (PValue value : userGraphContext.getInputs().values()) { - String componentId = executionGraphContext.getProducerComponentId(value); - if (componentId != null && executionGraphContext.getBolt(componentId) != null) { - return componentId; - } - } - // When upstream component is spout, "null" will be return. - return null; - } + public void addTransformExecutor( + Executor executor, Map<TupleTag<?>, PValue> inputs, Map<TupleTag<?>, PValue> outputs) { + addTransformExecutor(executor, inputs, outputs, Collections.EMPTY_LIST); + } - /** - * check if the current transform is applied to source collection. - * @return - */ - private boolean connectedToSource() { - for (PValue value : userGraphContext.getInputs().values()) { - if (executionGraphContext.producedBySpout(value)) { - return true; - } - } - return false; - } + public void addTransformExecutor( + Executor executor, + Map<TupleTag<?>, PValue> inputs, + Map<TupleTag<?>, PValue> outputs, + List<PValue> sideInputs) { + String name = null; + ExecutorsBolt bolt = null; + + boolean isGBK = false; /** - * @param upstreamExecutorsBolt - * @return true if there is multiple input streams, or upstream executor output the same stream - * to different executors + * Check if the transform executor needs to be chained into an existing ExecutorsBolt. + * For following cases, a new bolt is created for the specified executor, otherwise the executor + * will be added into the bolt contains corresponding upstream executor. + * a) it is a GroupByKey executor + * b) it is connected to source directly + * c) None existing upstream bolt was found + * d) For the purpose of performance to reduce the side effects between multiple streams which + * is output to same executor, a new bolt will be created. */ - private boolean isMultipleInputOrOutput(ExecutorsBolt upstreamExecutorsBolt, Map<TupleTag<?>, PValue> inputs) { - if (inputs.size() > 1) { - return true; - } else { - final Sets.SetView<TupleTag> intersection = Sets.intersection(upstreamExecutorsBolt.getExecutors().keySet(), inputs.keySet()); - if (!intersection.isEmpty()) { - // there is already a different executor consume the same input - return true; - } else { - return false; - } - } + if (RunnerUtils.isGroupByKeyExecutor(executor)) { + bolt = new ExecutorsBolt(); + name = executionGraphContext.registerBolt(bolt); + isGBK = true; + } else if (connectedToSource()) { + bolt = new ExecutorsBolt(); + name = executionGraphContext.registerBolt(bolt); + } else { + name = getUpstreamExecutorsBolt(); + if (name == null) { + bolt = new ExecutorsBolt(); + name = executionGraphContext.registerBolt(bolt); + } else { + bolt = executionGraphContext.getBolt(name); + if (isMultipleInputOrOutput(bolt, inputs)) { + bolt = new ExecutorsBolt(); + name = executionGraphContext.registerBolt(bolt); + } + } } - public void addTransformExecutor(Executor executor) { - addTransformExecutor(executor, Collections.EMPTY_LIST); - } - - public void addTransformExecutor(Executor executor, List<PValue> sideInputs) { - addTransformExecutor(executor, userGraphContext.getInputs(), userGraphContext.getOutputs(), sideInputs); - } - - public void addTransformExecutor(Executor executor, Map<TupleTag<?>, PValue> inputs, Map<TupleTag<?>, PValue> outputs) { - addTransformExecutor(executor, inputs, outputs, Collections.EMPTY_LIST); - } - - public void addTransformExecutor(Executor executor, Map<TupleTag<?>, PValue> inputs, Map<TupleTag<?>, PValue> outputs, List<PValue> sideInputs) { - String name = null; - - ExecutorsBolt bolt = null; - - boolean isGBK = false; - /** - * Check if the transform executor needs to be chained into an existing ExecutorsBolt. - * For following cases, a new bolt is created for the specified executor, otherwise the executor - * will be added into the bolt contains corresponding upstream executor. - * a) it is a GroupByKey executor - * b) it is connected to source directly - * c) None existing upstream bolt was found - * d) For the purpose of performance to reduce the side effects between multiple streams which - * is output to same executor, a new bolt will be created. - */ - if (RunnerUtils.isGroupByKeyExecutor(executor)) { - bolt = new ExecutorsBolt(); - name = executionGraphContext.registerBolt(bolt); - isGBK = true; - } else if (connectedToSource()) { - bolt = new ExecutorsBolt(); - name = executionGraphContext.registerBolt(bolt); - } else { - name = getUpstreamExecutorsBolt(); - if (name == null) { - bolt = new ExecutorsBolt(); - name = executionGraphContext.registerBolt(bolt); - } else { - bolt = executionGraphContext.getBolt(name); - if (isMultipleInputOrOutput(bolt, inputs)) { - bolt = new ExecutorsBolt(); - name = executionGraphContext.registerBolt(bolt); - } - } - } - - // update the output tags of current transform into ExecutorsBolt - for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) { - TupleTag tag = entry.getKey(); - PValue value = entry.getValue(); - - // use tag of PValueBase - if (value instanceof PValueBase) { - tag = ((PValueBase) value).expand().keySet().iterator().next(); - } - executionGraphContext.registerStreamProducer( - TaggedPValue.of(tag, value), - Stream.Producer.of(name, tag.getId(), value.getName())); - //bolt.addOutputTags(tag); - } + // update the output tags of current transform into ExecutorsBolt + for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) { + TupleTag tag = entry.getKey(); + PValue value = entry.getValue(); + + // use tag of PValueBase + if (value instanceof PValueBase) { + tag = ((PValueBase) value).expand().keySet().iterator().next(); + } + executionGraphContext.registerStreamProducer( + TaggedPValue.of(tag, value), + Stream.Producer.of(name, tag.getId(), value.getName())); + //bolt.addOutputTags(tag); + } - // add the transform executor into the chain of ExecutorsBolt - for (Map.Entry<TupleTag<?>, PValue> entry : inputs.entrySet()) { - TupleTag tag = entry.getKey(); - PValue value = entry.getValue(); - bolt.addExecutor(tag, executor); - - // filter all connections inside bolt - //if (!bolt.getOutputTags().contains(tag)) { - Stream.Grouping grouping; - if (isGBK) { - grouping = Stream.Grouping.byFields(Arrays.asList(CommonInstance.KEY)); - } else { - grouping = Stream.Grouping.of(Stream.Grouping.Type.LOCAL_OR_SHUFFLE); - } - addStormStreamDef(TaggedPValue.of(tag, value), name, grouping); - //} - } + // add the transform executor into the chain of ExecutorsBolt + for (Map.Entry<TupleTag<?>, PValue> entry : inputs.entrySet()) { + TupleTag tag = entry.getKey(); + PValue value = entry.getValue(); + bolt.addExecutor(tag, executor); + + // filter all connections inside bolt + //if (!bolt.getOutputTags().contains(tag)) { + Stream.Grouping grouping; + if (isGBK) { + grouping = Stream.Grouping.byFields(Arrays.asList(CommonInstance.KEY)); + } else { + grouping = Stream.Grouping.of(Stream.Grouping.Type.LOCAL_OR_SHUFFLE); + } + addStormStreamDef(TaggedPValue.of(tag, value), name, grouping); + //} + } - for (PValue sideInput : sideInputs) { - TupleTag tag = userGraphContext.findTupleTag(sideInput); - bolt.addExecutor(tag, executor); - checkState(!bolt.getOutputTags().contains(tag)); - addStormStreamDef(TaggedPValue.of(tag, sideInput), name, Stream.Grouping.of(Stream.Grouping.Type.ALL)); - } + for (PValue sideInput : sideInputs) { + TupleTag tag = userGraphContext.findTupleTag(sideInput); + bolt.addExecutor(tag, executor); + checkState(!bolt.getOutputTags().contains(tag)); + addStormStreamDef( + TaggedPValue.of(tag, sideInput), name, Stream.Grouping.of(Stream.Grouping.Type.ALL)); + } - bolt.registerExecutor(executor); + bolt.registerExecutor(executor); - // set parallelismNumber - String pTransformfullName = userGraphContext.currentTransform.getFullName(); - String compositeName = pTransformfullName.split("/")[0]; - Map parallelismNumMap = userGraphContext.getOptions().getParallelismNumMap(); - if (parallelismNumMap.containsKey(compositeName)) { - int configNum = (Integer) parallelismNumMap.get(compositeName); - int currNum = bolt.getParallelismNum(); - bolt.setParallelismNum(Math.max(configNum, currNum)); - } + // set parallelismNumber + String pTransformfullName = userGraphContext.currentTransform.getFullName(); + String compositeName = pTransformfullName.split("/")[0]; + Map parallelismNumMap = userGraphContext.getOptions().getParallelismNumMap(); + if (parallelismNumMap.containsKey(compositeName)) { + int configNum = (Integer) parallelismNumMap.get(compositeName); + int currNum = bolt.getParallelismNum(); + bolt.setParallelismNum(Math.max(configNum, currNum)); } + } - // TODO: add getSideInputs() and getSideOutputs(). - public static class UserGraphContext { - private final JStormPipelineOptions options; - private final Map<PValue, TupleTag> pValueToTupleTag; - private AppliedPTransform<?, ?, ?> currentTransform = null; + // TODO: add getSideInputs() and getSideOutputs(). + public static class UserGraphContext { + private final JStormPipelineOptions options; + private final Map<PValue, TupleTag> pValueToTupleTag; + private AppliedPTransform<?, ?, ?> currentTransform = null; - private boolean isWindowed = false; + private boolean isWindowed = false; - public UserGraphContext(JStormPipelineOptions options) { - this.options = checkNotNull(options, "options"); - this.pValueToTupleTag = Maps.newHashMap(); - } + public UserGraphContext(JStormPipelineOptions options) { + this.options = checkNotNull(options, "options"); + this.pValueToTupleTag = Maps.newHashMap(); + } - public JStormPipelineOptions getOptions() { - return this.options; - } + public JStormPipelineOptions getOptions() { + return this.options; + } - public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) { - this.currentTransform = transform; - } + public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) { + this.currentTransform = transform; + } - public String getStepName() { - return currentTransform.getFullName(); - } + public String getStepName() { + return currentTransform.getFullName(); + } - public <T extends PValue> T getInput() { - return (T) currentTransform.getInputs().values().iterator().next(); - } + public <T extends PValue> T getInput() { + return (T) currentTransform.getInputs().values().iterator().next(); + } - public Map<TupleTag<?>, PValue> getInputs() { - return currentTransform.getInputs(); - } + public Map<TupleTag<?>, PValue> getInputs() { + return currentTransform.getInputs(); + } - public TupleTag<?> getInputTag() { - return currentTransform.getInputs().keySet().iterator().next(); - } + public TupleTag<?> getInputTag() { + return currentTransform.getInputs().keySet().iterator().next(); + } - public List<TupleTag<?>> getInputTags() { - return Lists.newArrayList(currentTransform.getInputs().keySet()); - } + public List<TupleTag<?>> getInputTags() { + return Lists.newArrayList(currentTransform.getInputs().keySet()); + } - public <T extends PValue> T getOutput() { - return (T) currentTransform.getOutputs().values().iterator().next(); - } + public <T extends PValue> T getOutput() { + return (T) currentTransform.getOutputs().values().iterator().next(); + } - public Map<TupleTag<?>, PValue> getOutputs() { - return currentTransform.getOutputs(); - } + public Map<TupleTag<?>, PValue> getOutputs() { + return currentTransform.getOutputs(); + } - public TupleTag<?> getOutputTag() { - return currentTransform.getOutputs().keySet().iterator().next(); - } + public TupleTag<?> getOutputTag() { + return currentTransform.getOutputs().keySet().iterator().next(); + } - public List<TupleTag<?>> getOutputTags() { - return Lists.newArrayList(currentTransform.getOutputs().keySet()); - } + public List<TupleTag<?>> getOutputTags() { + return Lists.newArrayList(currentTransform.getOutputs().keySet()); + } - public void recordOutputTaggedPValue() { - for (Map.Entry<TupleTag<?>, PValue> entry : getOutputs().entrySet()) { - pValueToTupleTag.put(entry.getValue(), entry.getKey()); - } - } + public void recordOutputTaggedPValue() { + for (Map.Entry<TupleTag<?>, PValue> entry : getOutputs().entrySet()) { + pValueToTupleTag.put(entry.getValue(), entry.getKey()); + } + } - public <T> TupleTag<T> findTupleTag(PValue pValue) { - return pValueToTupleTag.get(checkNotNull(pValue, "pValue")); - } + public <T> TupleTag<T> findTupleTag(PValue pValue) { + return pValueToTupleTag.get(checkNotNull(pValue, "pValue")); + } - public void setWindowed() { - this.isWindowed = true; - } + public void setWindowed() { + this.isWindowed = true; + } - public boolean isWindowed() { - return this.isWindowed; - } + public boolean isWindowed() { + return this.isWindowed; + } - @Override - public String toString() { - return Joiner.on('\n').join(FluentIterable.from(pValueToTupleTag.entrySet()) - .transform(new Function<Map.Entry<PValue,TupleTag>, String>() { - @Override - public String apply(Map.Entry<PValue, TupleTag> entry) { - return String.format("%s == %s", entry.getValue().getId(), entry.getKey().getName()); - }})); - } + @Override + public String toString() { + return Joiner.on('\n').join(FluentIterable.from(pValueToTupleTag.entrySet()) + .transform(new Function<Map.Entry<PValue, TupleTag>, String>() { + @Override + public String apply(Map.Entry<PValue, TupleTag> entry) { + return String.format("%s == %s", entry.getValue().getId(), entry.getKey().getName()); + } + })); } + } - public static class ExecutionGraphContext { + public static class ExecutionGraphContext { - private final Map<String, AdaptorBasicSpout> spoutMap = new HashMap<>(); - private final Map<String, ExecutorsBolt> boltMap = new HashMap<>(); + private final Map<String, AdaptorBasicSpout> spoutMap = new HashMap<>(); + private final Map<String, ExecutorsBolt> boltMap = new HashMap<>(); - // One-to-one mapping between Stream.Producer and TaggedPValue (or PValue). - private final Map<PValue, Stream.Producer> pValueToProducer = new HashMap<>(); - private final Map<Stream.Producer, TaggedPValue> producerToTaggedPValue = new HashMap<>(); + // One-to-one mapping between Stream.Producer and TaggedPValue (or PValue). + private final Map<PValue, Stream.Producer> pValueToProducer = new HashMap<>(); + private final Map<Stream.Producer, TaggedPValue> producerToTaggedPValue = new HashMap<>(); - private final List<Stream> streams = new ArrayList<>(); + private final List<Stream> streams = new ArrayList<>(); - private int id = 1; + private int id = 1; - public void registerSpout(AdaptorBasicSpout spout, TaggedPValue output) { - checkNotNull(spout, "spout"); - checkNotNull(output, "output"); - String name = "spout" + genId(); - this.spoutMap.put(name, spout); - registerStreamProducer( - output, - Stream.Producer.of(name, output.getTag().getId(), output.getValue().getName())); - } + public void registerSpout(AdaptorBasicSpout spout, TaggedPValue output) { + checkNotNull(spout, "spout"); + checkNotNull(output, "output"); + String name = "spout" + genId(); + this.spoutMap.put(name, spout); + registerStreamProducer( + output, + Stream.Producer.of(name, output.getTag().getId(), output.getValue().getName())); + } - public AdaptorBasicSpout getSpout(String id) { - if (Strings.isNullOrEmpty(id)) { - return null; - } - return this.spoutMap.get(id); - } + public AdaptorBasicSpout getSpout(String id) { + if (Strings.isNullOrEmpty(id)) { + return null; + } + return this.spoutMap.get(id); + } - public Map<String, AdaptorBasicSpout> getSpouts() { - return this.spoutMap; - } + public Map<String, AdaptorBasicSpout> getSpouts() { + return this.spoutMap; + } - public String registerBolt(ExecutorsBolt bolt) { - checkNotNull(bolt, "bolt"); - String name = "bolt" + genId(); - this.boltMap.put(name, bolt); - return name; - } + public String registerBolt(ExecutorsBolt bolt) { + checkNotNull(bolt, "bolt"); + String name = "bolt" + genId(); + this.boltMap.put(name, bolt); + return name; + } - public ExecutorsBolt getBolt(String id) { - if (Strings.isNullOrEmpty(id)) { - return null; - } - return this.boltMap.get(id); - } + public ExecutorsBolt getBolt(String id) { + if (Strings.isNullOrEmpty(id)) { + return null; + } + return this.boltMap.get(id); + } - public void registerStreamProducer(TaggedPValue taggedPValue, Stream.Producer producer) { - checkNotNull(taggedPValue, "taggedPValue"); - checkNotNull(producer, "producer"); - pValueToProducer.put(taggedPValue.getValue(), producer); - producerToTaggedPValue.put(producer, taggedPValue); - } + public void registerStreamProducer(TaggedPValue taggedPValue, Stream.Producer producer) { + checkNotNull(taggedPValue, "taggedPValue"); + checkNotNull(producer, "producer"); + pValueToProducer.put(taggedPValue.getValue(), producer); + producerToTaggedPValue.put(producer, taggedPValue); + } - public Stream.Producer getProducer(PValue pValue) { - return pValueToProducer.get(checkNotNull(pValue, "pValue")); - } + public Stream.Producer getProducer(PValue pValue) { + return pValueToProducer.get(checkNotNull(pValue, "pValue")); + } - public String getProducerComponentId(PValue pValue) { - Stream.Producer producer = getProducer(pValue); - return producer == null ? null : producer.getComponentId(); - } + public String getProducerComponentId(PValue pValue) { + Stream.Producer producer = getProducer(pValue); + return producer == null ? null : producer.getComponentId(); + } - public boolean producedBySpout(PValue pValue) { - String componentId = getProducerComponentId(pValue); - return getSpout(componentId) != null; - } + public boolean producedBySpout(PValue pValue) { + String componentId = getProducerComponentId(pValue); + return getSpout(componentId) != null; + } - public void registerStreamConsumer(Stream.Consumer consumer, Stream.Producer producer) { - streams.add(Stream.of( - checkNotNull(producer, "producer"), - checkNotNull(consumer, "consumer"))); - } + public void registerStreamConsumer(Stream.Consumer consumer, Stream.Producer producer) { + streams.add(Stream.of( + checkNotNull(producer, "producer"), + checkNotNull(consumer, "consumer"))); + } - public Map<PValue, Stream.Producer> getPValueToProducers() { - return pValueToProducer; - } + public Map<PValue, Stream.Producer> getPValueToProducers() { + return pValueToProducer; + } - public Iterable<Stream> getStreams() { - return streams; - } + public Iterable<Stream> getStreams() { + return streams; + } - @Override - public String toString() { - List<String> ret = new ArrayList<>(); - ret.add("SPOUT"); - for (Map.Entry<String, AdaptorBasicSpout> entry : spoutMap.entrySet()) { - ret.add(entry.getKey() + ": " + entry.getValue().toString()); - } - ret.add("BOLT"); - for (Map.Entry<String, ExecutorsBolt> entry : boltMap.entrySet()) { - ret.add(entry.getKey() + ": " + entry.getValue().toString()); - } - ret.add("STREAM"); - for (Stream stream : streams) { - ret.add(String.format( - "%s@@%s ---> %s@@%s", - stream.getProducer().getStreamId(), - stream.getProducer().getComponentId(), - stream.getConsumer().getGrouping(), - stream.getConsumer().getComponentId())); - } - return Joiner.on("\n").join(ret); - } + @Override + public String toString() { + List<String> ret = new ArrayList<>(); + ret.add("SPOUT"); + for (Map.Entry<String, AdaptorBasicSpout> entry : spoutMap.entrySet()) { + ret.add(entry.getKey() + ": " + entry.getValue().toString()); + } + ret.add("BOLT"); + for (Map.Entry<String, ExecutorsBolt> entry : boltMap.entrySet()) { + ret.add(entry.getKey() + ": " + entry.getValue().toString()); + } + ret.add("STREAM"); + for (Stream stream : streams) { + ret.add(String.format( + "%s@@%s ---> %s@@%s", + stream.getProducer().getStreamId(), + stream.getProducer().getComponentId(), + stream.getConsumer().getGrouping(), + stream.getConsumer().getComponentId())); + } + return Joiner.on("\n").join(ret); + } - private synchronized int genId() { - return id++; - } + private synchronized int genId() { + return id++; } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java index a33f07b..bce5b3e 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.jstorm.translation; +import java.util.HashMap; +import java.util.Map; import org.apache.beam.runners.jstorm.translation.translator.BoundedSourceTranslator; import org.apache.beam.runners.jstorm.translation.translator.FlattenTranslator; import org.apache.beam.runners.jstorm.translation.translator.GroupByKeyTranslator; @@ -35,50 +37,49 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.Map; - /** * Lookup table mapping PTransform types to associated TransformTranslator implementations. */ public class TranslatorRegistry { - private static final Logger LOG = LoggerFactory.getLogger(TranslatorRegistry.class); + private static final Logger LOG = LoggerFactory.getLogger(TranslatorRegistry.class); - private static final Map<Class<? extends PTransform>, TransformTranslator> TRANSLATORS = new HashMap<>(); + private static final Map<Class<? extends PTransform>, TransformTranslator> TRANSLATORS = + new HashMap<>(); - static { - TRANSLATORS.put(Read.Bounded.class, new BoundedSourceTranslator()); - TRANSLATORS.put(Read.Unbounded.class, new UnboundedSourceTranslator()); - // TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator()); - // TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator()); + static { + TRANSLATORS.put(Read.Bounded.class, new BoundedSourceTranslator()); + TRANSLATORS.put(Read.Unbounded.class, new UnboundedSourceTranslator()); + // TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator()); + // TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator()); - TRANSLATORS.put(ParDo.SingleOutput.class, new ParDoBoundTranslator()); - TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoBoundMultiTranslator()); + TRANSLATORS.put(ParDo.SingleOutput.class, new ParDoBoundTranslator()); + TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoBoundMultiTranslator()); - //TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator<>()); - TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator<>()); + //TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator<>()); + TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator<>()); - TRANSLATORS.put(Flatten.PCollections.class, new FlattenTranslator()); + TRANSLATORS.put(Flatten.PCollections.class, new FlattenTranslator()); - TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator()); + TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator()); - TRANSLATORS.put(ViewTranslator.CreateJStormPCollectionView.class, new ViewTranslator()); + TRANSLATORS.put(ViewTranslator.CreateJStormPCollectionView.class, new ViewTranslator()); - /** - * Currently, empty translation is required for combine and reshuffle. Because, the transforms will be - * mapped to GroupByKey and Pardo finally. So we only need to translator the finally transforms. - * If any improvement is required, the composite transforms will be translated in the future. - */ - // TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator()); - // TRANSLATORS.put(Globally.class, new CombineGloballyTranslator()); - // TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslator()); - } + /** + * Currently, empty translation is required for combine and reshuffle. + * Because, the transforms will be mapped to GroupByKey and Pardo finally. + * So we only need to translator the finally transforms. + * If any improvement is required, the composite transforms will be translated in the future. + */ + // TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator()); + // TRANSLATORS.put(Globally.class, new CombineGloballyTranslator()); + // TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslator()); + } - public static TransformTranslator<?> getTranslator(PTransform<?, ?> transform) { - TransformTranslator<?> translator = TRANSLATORS.get(transform.getClass()); - if (translator == null) { - LOG.warn("Unsupported operator={}", transform.getClass().getName()); - } - return translator; + public static TransformTranslator<?> getTranslator(PTransform<?, ?> transform) { + TransformTranslator<?> translator = TRANSLATORS.get(transform.getClass()); + if (translator == null) { + LOG.warn("Unsupported operator={}", transform.getClass().getName()); } + return translator; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java index b07b426..68e9e17 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java @@ -17,54 +17,52 @@ */ package org.apache.beam.runners.jstorm.translation.runtime; -import java.util.HashMap; -import java.util.Map; - -import org.apache.beam.runners.jstorm.translation.util.CommonInstance; - import backtype.storm.topology.IComponent; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.runners.jstorm.translation.util.CommonInstance; /* * Enable user to add output stream definitions by API, rather than hard-code. */ public abstract class AbstractComponent implements IComponent { - private Map<String, Fields> streamToFields = new HashMap<>(); - private Map<String, Boolean> keyStreams = new HashMap<>(); - private int parallelismNum = 0; + private Map<String, Fields> streamToFields = new HashMap<>(); + private Map<String, Boolean> keyStreams = new HashMap<>(); + private int parallelismNum = 0; - public void addOutputField(String streamId) { - addOutputField(streamId, new Fields(CommonInstance.VALUE)); - } + public void addOutputField(String streamId) { + addOutputField(streamId, new Fields(CommonInstance.VALUE)); + } - public void addOutputField(String streamId, Fields fields) { - streamToFields.put(streamId, fields); - keyStreams.put(streamId, false); - } + public void addOutputField(String streamId, Fields fields) { + streamToFields.put(streamId, fields); + keyStreams.put(streamId, false); + } - public void addKVOutputField(String streamId) { - streamToFields.put(streamId, new Fields(CommonInstance.KEY, CommonInstance.VALUE)); - keyStreams.put(streamId, true); - } + public void addKVOutputField(String streamId) { + streamToFields.put(streamId, new Fields(CommonInstance.KEY, CommonInstance.VALUE)); + keyStreams.put(streamId, true); + } - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - for (Map.Entry<String, Fields> entry : streamToFields.entrySet()) { - declarer.declareStream(entry.getKey(), entry.getValue()); - } + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + for (Map.Entry<String, Fields> entry : streamToFields.entrySet()) { + declarer.declareStream(entry.getKey(), entry.getValue()); } + } - public boolean keyedEmit(String streamId) { - Boolean isKeyedStream = keyStreams.get(streamId); - return isKeyedStream == null ? false : isKeyedStream; - } + public boolean keyedEmit(String streamId) { + Boolean isKeyedStream = keyStreams.get(streamId); + return isKeyedStream == null ? false : isKeyedStream; + } - public int getParallelismNum() { - return parallelismNum; - } + public int getParallelismNum() { + return parallelismNum; + } - public void setParallelismNum(int num) { - parallelismNum = num; - } + public void setParallelismNum(int num) { + parallelismNum = num; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java index 91881f2..5e9b056 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java @@ -20,5 +20,5 @@ package org.apache.beam.runners.jstorm.translation.runtime; import backtype.storm.topology.IRichBatchBolt; public abstract class AdaptorBasicBolt extends AbstractComponent implements IRichBatchBolt { - + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java index 5a0c6ec..0480518 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java @@ -20,5 +20,5 @@ package org.apache.beam.runners.jstorm.translation.runtime; import backtype.storm.topology.IRichSpout; public abstract class AdaptorBasicSpout extends AbstractComponent implements IRichSpout { - + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java index c73a3b8..9507948 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java @@ -17,312 +17,319 @@ */ package org.apache.beam.runners.jstorm.translation.runtime; -import java.io.Serializable; -import java.util.*; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import avro.shaded.com.google.common.collect.Iterables; -import org.apache.beam.runners.jstorm.JStormPipelineOptions; -import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals; -import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals; - import com.alibaba.jstorm.cache.IKvStoreManager; import com.alibaba.jstorm.metric.MetricClient; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.DoFnRunners.OutputManager; +import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner; import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.StateTags; import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.runners.core.StateNamespace; -import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.jstorm.JStormPipelineOptions; +import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals; +import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals; +import org.apache.beam.runners.jstorm.translation.util.DefaultStepContext; +import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.WatermarkHoldState; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.WindowingStrategy; -import org.apache.beam.sdk.state.BagState; -import org.apache.beam.sdk.state.WatermarkHoldState; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.beam.runners.jstorm.translation.util.DefaultStepContext; -import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - public class DoFnExecutor<InputT, OutputT> implements Executor { - private static final long serialVersionUID = 5297603063991078668L; + private static final long serialVersionUID = 5297603063991078668L; - private static final Logger LOG = LoggerFactory.getLogger(DoFnExecutor.class); + private static final Logger LOG = LoggerFactory.getLogger(DoFnExecutor.class); - public class DoFnExecutorOutputManager implements OutputManager, Serializable { - private static final long serialVersionUID = -661113364735206170L; + public class DoFnExecutorOutputManager implements OutputManager, Serializable { + private static final long serialVersionUID = -661113364735206170L; - @Override - public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { - executorsBolt.processExecutorElem(tag, output); - } - } - - protected transient DoFnRunner<InputT, OutputT> runner = null; - protected transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackRunner = null; - - protected final String stepName; - - protected int internalDoFnExecutorId; - - protected final String description; - - protected final TupleTag<OutputT> mainTupleTag; - protected final List<TupleTag<?>> sideOutputTags; - - protected SerializedPipelineOptions serializedOptions; - protected transient JStormPipelineOptions pipelineOptions; - - protected DoFn<InputT, OutputT> doFn; - protected final Coder<WindowedValue<InputT>> inputCoder; - protected DoFnInvoker<InputT, OutputT> doFnInvoker; - protected OutputManager outputManager; - protected WindowingStrategy<?, ?> windowingStrategy; - protected final TupleTag<InputT> mainInputTag; - protected Collection<PCollectionView<?>> sideInputs; - protected SideInputHandler sideInputHandler; - protected final Map<TupleTag, PCollectionView<?>> sideInputTagToView; - - // Initialize during runtime - protected ExecutorContext executorContext; - protected ExecutorsBolt executorsBolt; - protected TimerInternals timerInternals; - protected transient StateInternals pushbackStateInternals; - protected transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag; - protected transient StateTag<WatermarkHoldState> watermarkHoldTag; - protected transient IKvStoreManager kvStoreManager; - protected DefaultStepContext stepContext; - protected transient MetricClient metricClient; - - public DoFnExecutor( - String stepName, - String description, - JStormPipelineOptions pipelineOptions, - DoFn<InputT, OutputT> doFn, - Coder<WindowedValue<InputT>> inputCoder, - WindowingStrategy<?, ?> windowingStrategy, - TupleTag<InputT> mainInputTag, - Collection<PCollectionView<?>> sideInputs, - Map<TupleTag, PCollectionView<?>> sideInputTagToView, - TupleTag<OutputT> mainTupleTag, - List<TupleTag<?>> sideOutputTags) { - this.stepName = checkNotNull(stepName, "stepName"); - this.description = checkNotNull(description, "description"); - this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); - this.doFn = doFn; - this.inputCoder = inputCoder; - this.outputManager = new DoFnExecutorOutputManager(); - this.windowingStrategy = windowingStrategy; - this.mainInputTag = mainInputTag; - this.sideInputs = sideInputs; - this.mainTupleTag = mainTupleTag; - this.sideOutputTags = sideOutputTags; - this.sideInputTagToView = sideInputTagToView; + @Override + public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { + executorsBolt.processExecutorElem(tag, output); } - - protected DoFnRunner<InputT, OutputT> getDoFnRunner() { - return new DoFnRunnerWithMetrics<>( - stepName, - DoFnRunners.simpleRunner( - this.pipelineOptions, - this.doFn, - this.sideInputHandler == null ? NullSideInputReader.empty() : sideInputHandler, - this.outputManager, - this.mainTupleTag, - this.sideOutputTags, - this.stepContext, - this.windowingStrategy), - MetricsReporter.create(metricClient)); + } + + protected transient DoFnRunner<InputT, OutputT> runner = null; + protected transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackRunner = null; + + protected final String stepName; + + protected int internalDoFnExecutorId; + + protected final String description; + + protected final TupleTag<OutputT> mainTupleTag; + protected final List<TupleTag<?>> sideOutputTags; + + protected SerializedPipelineOptions serializedOptions; + protected transient JStormPipelineOptions pipelineOptions; + + protected DoFn<InputT, OutputT> doFn; + protected final Coder<WindowedValue<InputT>> inputCoder; + protected DoFnInvoker<InputT, OutputT> doFnInvoker; + protected OutputManager outputManager; + protected WindowingStrategy<?, ?> windowingStrategy; + protected final TupleTag<InputT> mainInputTag; + protected Collection<PCollectionView<?>> sideInputs; + protected SideInputHandler sideInputHandler; + protected final Map<TupleTag, PCollectionView<?>> sideInputTagToView; + + // Initialize during runtime + protected ExecutorContext executorContext; + protected ExecutorsBolt executorsBolt; + protected TimerInternals timerInternals; + protected transient StateInternals pushbackStateInternals; + protected transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag; + protected transient StateTag<WatermarkHoldState> watermarkHoldTag; + protected transient IKvStoreManager kvStoreManager; + protected DefaultStepContext stepContext; + protected transient MetricClient metricClient; + + public DoFnExecutor( + String stepName, + String description, + JStormPipelineOptions pipelineOptions, + DoFn<InputT, OutputT> doFn, + Coder<WindowedValue<InputT>> inputCoder, + WindowingStrategy<?, ?> windowingStrategy, + TupleTag<InputT> mainInputTag, + Collection<PCollectionView<?>> sideInputs, + Map<TupleTag, PCollectionView<?>> sideInputTagToView, + TupleTag<OutputT> mainTupleTag, + List<TupleTag<?>> sideOutputTags) { + this.stepName = checkNotNull(stepName, "stepName"); + this.description = checkNotNull(description, "description"); + this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); + this.doFn = doFn; + this.inputCoder = inputCoder; + this.outputManager = new DoFnExecutorOutputManager(); + this.windowingStrategy = windowingStrategy; + this.mainInputTag = mainInputTag; + this.sideInputs = sideInputs; + this.mainTupleTag = mainTupleTag; + this.sideOutputTags = sideOutputTags; + this.sideInputTagToView = sideInputTagToView; + } + + protected DoFnRunner<InputT, OutputT> getDoFnRunner() { + return new DoFnRunnerWithMetrics<>( + stepName, + DoFnRunners.simpleRunner( + this.pipelineOptions, + this.doFn, + this.sideInputHandler == null ? NullSideInputReader.empty() : sideInputHandler, + this.outputManager, + this.mainTupleTag, + this.sideOutputTags, + this.stepContext, + this.windowingStrategy), + MetricsReporter.create(metricClient)); + } + + protected void initService(ExecutorContext context) { + // TODO: what should be set for key in here? + timerInternals = new JStormTimerInternals( + null /* key */, this, context.getExecutorsBolt().timerService()); + kvStoreManager = context.getKvStoreManager(); + stepContext = new DefaultStepContext(timerInternals, + new JStormStateInternals( + null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); + metricClient = new MetricClient(executorContext.getTopologyContext()); + } + + @Override + public void init(ExecutorContext context) { + this.executorContext = context; + this.executorsBolt = context.getExecutorsBolt(); + this.pipelineOptions = + this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class); + + initService(context); + + // Side inputs setup + if (sideInputs != null && sideInputs.isEmpty() == false) { + pushedBackTag = StateTags.bag("pushed-back-values", inputCoder); + watermarkHoldTag = + StateTags.watermarkStateInternal("hold", TimestampCombiner.EARLIEST); + pushbackStateInternals = new JStormStateInternals( + null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId); + sideInputHandler = new SideInputHandler(sideInputs, pushbackStateInternals); + runner = getDoFnRunner(); + pushbackRunner = + SimplePushbackSideInputDoFnRunner.create(runner, sideInputs, sideInputHandler); + } else { + runner = getDoFnRunner(); } - protected void initService(ExecutorContext context) { - // TODO: what should be set for key in here? - timerInternals = new JStormTimerInternals(null /* key */, this, context.getExecutorsBolt().timerService()); - kvStoreManager = context.getKvStoreManager(); - stepContext = new DefaultStepContext(timerInternals, - new JStormStateInternals(null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); - metricClient = new MetricClient(executorContext.getTopologyContext()); + // Process user's setup + doFnInvoker = DoFnInvokers.invokerFor(doFn); + doFnInvoker.invokeSetup(); + } + + @Override + public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) { + LOG.debug(String.format("process: elemTag=%s, mainInputTag=%s, sideInputs=%s, elem={}", + tag, mainInputTag, sideInputs, elem.getValue())); + if (mainInputTag.equals(tag)) { + processMainInput(elem); + } else { + processSideInput(tag, elem); } - - @Override - public void init(ExecutorContext context) { - this.executorContext = context; - this.executorsBolt = context.getExecutorsBolt(); - this.pipelineOptions = this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class); - - initService(context); - - // Side inputs setup - if (sideInputs != null && sideInputs.isEmpty() == false) { - pushedBackTag = StateTags.bag("pushed-back-values", inputCoder); - watermarkHoldTag = - StateTags.watermarkStateInternal("hold", TimestampCombiner.EARLIEST); - pushbackStateInternals = new JStormStateInternals(null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId); - sideInputHandler = new SideInputHandler(sideInputs, pushbackStateInternals); - runner = getDoFnRunner(); - pushbackRunner = SimplePushbackSideInputDoFnRunner.create(runner, sideInputs, sideInputHandler); - } else { - runner = getDoFnRunner(); + } + + protected <T> void processMainInput(WindowedValue<T> elem) { + if (sideInputs.isEmpty()) { + runner.processElement((WindowedValue<InputT>) elem); + } else { + Iterable<WindowedValue<InputT>> justPushedBack = + pushbackRunner.processElementInReadyWindows((WindowedValue<InputT>) elem); + BagState<WindowedValue<InputT>> pushedBack = + pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); + + Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE; + for (WindowedValue<InputT> pushedBackValue : justPushedBack) { + if (pushedBackValue.getTimestamp().isBefore(min)) { + min = pushedBackValue.getTimestamp(); } - - // Process user's setup - doFnInvoker = DoFnInvokers.invokerFor(doFn); - doFnInvoker.invokeSetup(); + min = earlier(min, pushedBackValue.getTimestamp()); + pushedBack.add(pushedBackValue); + } + pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag).add(min); } + } - @Override - public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) { - LOG.debug(String.format("process: elemTag=%s, mainInputTag=%s, sideInputs=%s, elem={}", - tag, mainInputTag, sideInputs, elem.getValue())); - if (mainInputTag.equals(tag)) { - processMainInput(elem); - } else { - processSideInput(tag, elem); - } - } - - protected <T> void processMainInput(WindowedValue<T> elem) { - if (sideInputs.isEmpty()) { - runner.processElement((WindowedValue<InputT>) elem); - } else { - Iterable<WindowedValue<InputT>> justPushedBack = - pushbackRunner.processElementInReadyWindows((WindowedValue<InputT>) elem); - BagState<WindowedValue<InputT>> pushedBack = - pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); - - Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE; - for (WindowedValue<InputT> pushedBackValue : justPushedBack) { - if (pushedBackValue.getTimestamp().isBefore(min)) { - min = pushedBackValue.getTimestamp(); - } - min = earlier(min, pushedBackValue.getTimestamp()); - pushedBack.add(pushedBackValue); - } - pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag).add(min); - } - } - - protected void processSideInput(TupleTag tag, WindowedValue elem) { - LOG.debug(String.format("side inputs: %s, %s.", tag, elem)); - - PCollectionView<?> sideInputView = sideInputTagToView.get(tag); - sideInputHandler.addSideInputValue(sideInputView, elem); + protected void processSideInput(TupleTag tag, WindowedValue elem) { + LOG.debug(String.format("side inputs: %s, %s.", tag, elem)); - BagState<WindowedValue<InputT>> pushedBack = - pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); + PCollectionView<?> sideInputView = sideInputTagToView.get(tag); + sideInputHandler.addSideInputValue(sideInputView, elem); - List<WindowedValue<InputT>> newPushedBack = new ArrayList<>(); + BagState<WindowedValue<InputT>> pushedBack = + pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); - Iterable<WindowedValue<InputT>> pushedBackInputs = pushedBack.read(); - if (pushedBackInputs != null) { - for (WindowedValue<InputT> input : pushedBackInputs) { + List<WindowedValue<InputT>> newPushedBack = new ArrayList<>(); - Iterable<WindowedValue<InputT>> justPushedBack = - pushbackRunner.processElementInReadyWindows(input); - Iterables.addAll(newPushedBack, justPushedBack); - } - } - pushedBack.clear(); - - Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE; - for (WindowedValue<InputT> pushedBackValue : newPushedBack) { - min = earlier(min, pushedBackValue.getTimestamp()); - pushedBack.add(pushedBackValue); - } + Iterable<WindowedValue<InputT>> pushedBackInputs = pushedBack.read(); + if (pushedBackInputs != null) { + for (WindowedValue<InputT> input : pushedBackInputs) { - WatermarkHoldState watermarkHold = - pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag); - // TODO: clear-then-add is not thread-safe. - watermarkHold.clear(); - watermarkHold.add(min); + Iterable<WindowedValue<InputT>> justPushedBack = + pushbackRunner.processElementInReadyWindows(input); + Iterables.addAll(newPushedBack, justPushedBack); + } } + pushedBack.clear(); - /** - * Process all pushed back elements when receiving watermark with max timestamp - */ - public void processAllPushBackElements() { - if (sideInputs != null && sideInputs.isEmpty() == false) { - BagState<WindowedValue<InputT>> pushedBackElements = - pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); - if (pushedBackElements != null) { - for (WindowedValue<InputT> elem : pushedBackElements.read()) { - LOG.info("Process pushback elem={}", elem); - runner.processElement(elem); - } - pushedBackElements.clear(); - } - - WatermarkHoldState watermarkHold = - pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag); - watermarkHold.clear(); - watermarkHold.add(BoundedWindow.TIMESTAMP_MAX_VALUE); - } + Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE; + for (WindowedValue<InputT> pushedBackValue : newPushedBack) { + min = earlier(min, pushedBackValue.getTimestamp()); + pushedBack.add(pushedBackValue); } - public void onTimer(Object key, TimerInternals.TimerData timerData) { - StateNamespace namespace = timerData.getNamespace(); - checkArgument(namespace instanceof StateNamespaces.WindowNamespace); - BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow(); - if (pushbackRunner != null) { - pushbackRunner.onTimer(timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain()); - } else { - runner.onTimer(timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain()); + WatermarkHoldState watermarkHold = + pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag); + // TODO: clear-then-add is not thread-safe. + watermarkHold.clear(); + watermarkHold.add(min); + } + + /** + * Process all pushed back elements when receiving watermark with max timestamp + */ + public void processAllPushBackElements() { + if (sideInputs != null && sideInputs.isEmpty() == false) { + BagState<WindowedValue<InputT>> pushedBackElements = + pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); + if (pushedBackElements != null) { + for (WindowedValue<InputT> elem : pushedBackElements.read()) { + LOG.info("Process pushback elem={}", elem); + runner.processElement(elem); } - } + pushedBackElements.clear(); + } - @Override - public void cleanup() { - doFnInvoker.invokeTeardown(); + WatermarkHoldState watermarkHold = + pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag); + watermarkHold.clear(); + watermarkHold.add(BoundedWindow.TIMESTAMP_MAX_VALUE); } - - @Override - public String toString() { - return description; + } + + public void onTimer(Object key, TimerInternals.TimerData timerData) { + StateNamespace namespace = timerData.getNamespace(); + checkArgument(namespace instanceof StateNamespaces.WindowNamespace); + BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow(); + if (pushbackRunner != null) { + pushbackRunner.onTimer( + timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain()); + } else { + runner.onTimer( + timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain()); } - - private Instant earlier(Instant left, Instant right) { - return left.isBefore(right) ? left : right; + } + + @Override + public void cleanup() { + doFnInvoker.invokeTeardown(); + } + + @Override + public String toString() { + return description; + } + + private Instant earlier(Instant left, Instant right) { + return left.isBefore(right) ? left : right; + } + + public void startBundle() { + if (pushbackRunner != null) { + pushbackRunner.startBundle(); + } else { + runner.startBundle(); } + } - public void startBundle() { - if (pushbackRunner != null) { - pushbackRunner.startBundle(); - } else { - runner.startBundle(); - } + public void finishBundle() { + if (pushbackRunner != null) { + pushbackRunner.finishBundle(); + } else { + runner.finishBundle(); } + } - public void finishBundle() { - if (pushbackRunner != null) { - pushbackRunner.finishBundle(); - } else { - runner.finishBundle(); - } - } + public void setInternalDoFnExecutorId(int id) { + this.internalDoFnExecutorId = id; + } - public void setInternalDoFnExecutorId(int id) { - this.internalDoFnExecutorId = id; - } - - public int getInternalDoFnExecutorId() { - return internalDoFnExecutorId; - } + public int getInternalDoFnExecutorId() { + return internalDoFnExecutorId; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java index 98dbcc5..1610a8a 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java @@ -68,7 +68,8 @@ public class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT } @Override - public void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) { + public void onTimer( + String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) { try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer( metricsReporter.getMetricsContainer(stepName))) { delegate.onTimer(timerId, window, timestamp, timeDomain); http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java index d7214db..1a03cb8 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java @@ -18,17 +18,16 @@ package org.apache.beam.runners.jstorm.translation.runtime; import java.io.Serializable; - import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; public interface Executor extends Serializable { - /** - * Initialization during runtime - */ - void init(ExecutorContext context); + /** + * Initialization during runtime + */ + void init(ExecutorContext context); - <T> void process(TupleTag<T> tag, WindowedValue<T> elem); + <T> void process(TupleTag<T> tag, WindowedValue<T> elem); - void cleanup(); + void cleanup(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java index 1de881f..1f65921 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java @@ -23,13 +23,16 @@ import com.google.auto.value.AutoValue; @AutoValue public abstract class ExecutorContext { - public static ExecutorContext of(TopologyContext topologyContext, ExecutorsBolt bolt, IKvStoreManager kvStoreManager) { - return new AutoValue_ExecutorContext(topologyContext, bolt, kvStoreManager); - } + public static ExecutorContext of( + TopologyContext topologyContext, + ExecutorsBolt bolt, + IKvStoreManager kvStoreManager) { + return new AutoValue_ExecutorContext(topologyContext, bolt, kvStoreManager); + } - public abstract TopologyContext getTopologyContext(); + public abstract TopologyContext getTopologyContext(); - public abstract ExecutorsBolt getExecutorsBolt(); + public abstract ExecutorsBolt getExecutorsBolt(); - public abstract IKvStoreManager getKvStoreManager(); + public abstract IKvStoreManager getKvStoreManager(); }
