http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslationContext.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslationContext.java deleted file mode 100644 index c3e9805..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslationContext.java +++ /dev/null @@ -1,425 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation; - -import avro.shaded.com.google.common.collect.Lists; -import com.alibaba.jstorm.beam.translation.translator.Stream; -import com.alibaba.jstorm.beam.util.RunnerUtils; -import com.google.common.base.Strings; -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.PValueBase; -import org.apache.beam.sdk.values.TaggedPValue; -import org.apache.beam.sdk.values.TupleTag; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.alibaba.jstorm.beam.StormPipelineOptions; -import com.alibaba.jstorm.beam.translation.runtime.AdaptorBasicSpout; -import com.alibaba.jstorm.beam.translation.runtime.Executor; -import com.alibaba.jstorm.beam.translation.runtime.ExecutorsBolt; -import com.alibaba.jstorm.beam.translation.util.CommonInstance; - -import java.util.*; - -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -/** - * Maintains the state necessary during Pipeline translation to build a Storm topology. - */ -public class TranslationContext { - private static final Logger LOG = LoggerFactory.getLogger(TranslationContext.class); - - private final UserGraphContext userGraphContext; - private final ExecutionGraphContext executionGraphContext; - - public TranslationContext(StormPipelineOptions options) { - this.userGraphContext = new UserGraphContext(options); - this.executionGraphContext = new ExecutionGraphContext(); - } - - public ExecutionGraphContext getExecutionGraphContext() { - return executionGraphContext; - } - - public UserGraphContext getUserGraphContext() { - return userGraphContext; - } - - private void addStormStreamDef(TaggedPValue input, String destComponentName, Stream.Grouping grouping) { - Stream.Producer producer = executionGraphContext.getProducer(input.getValue()); - if (!producer.getComponentId().equals(destComponentName)) { - Stream.Consumer consumer = Stream.Consumer.of(destComponentName, grouping); - executionGraphContext.registerStreamConsumer(consumer, producer); - - ExecutorsBolt executorsBolt = executionGraphContext.getBolt(producer.getComponentId()); - if (executorsBolt != null) { - executorsBolt.addExternalOutputTag(input.getTag()); - } - } - } - - private String getUpstreamExecutorsBolt() { - for (PValue value : userGraphContext.getInputs().values()) { - String componentId = executionGraphContext.getProducerComponentId(value); - if (componentId != null && executionGraphContext.getBolt(componentId) != null) { - return componentId; - } - } - // When upstream component is spout, "null" will be return. - return null; - } - - /** - * check if the current transform is applied to source collection. - * @return - */ - private boolean connectedToSource() { - for (PValue value : userGraphContext.getInputs().values()) { - if (executionGraphContext.producedBySpout(value)) { - return true; - } - } - return false; - } - - /** - * @param upstreamExecutorsBolt - * @return true if there is multiple input streams, or upstream executor output the same stream - * to different executors - */ - private boolean isMultipleInputOrOutput(ExecutorsBolt upstreamExecutorsBolt, Map<TupleTag<?>, PValue> inputs) { - if (inputs.size() > 1) { - return true; - } else { - final Sets.SetView<TupleTag> intersection = Sets.intersection(upstreamExecutorsBolt.getExecutors().keySet(), inputs.keySet()); - if (!intersection.isEmpty()) { - // there is already a different executor consume the same input - return true; - } else { - return false; - } - } - } - - public void addTransformExecutor(Executor executor) { - addTransformExecutor(executor, Collections.EMPTY_LIST); - } - - public void addTransformExecutor(Executor executor, List<PValue> sideInputs) { - addTransformExecutor(executor, userGraphContext.getInputs(), userGraphContext.getOutputs(), sideInputs); - } - - public void addTransformExecutor(Executor executor, Map<TupleTag<?>, PValue> inputs, Map<TupleTag<?>, PValue> outputs) { - addTransformExecutor(executor, inputs, outputs, Collections.EMPTY_LIST); - } - - public void addTransformExecutor(Executor executor, Map<TupleTag<?>, PValue> inputs, Map<TupleTag<?>, PValue> outputs, List<PValue> sideInputs) { - String name = null; - - ExecutorsBolt bolt = null; - - boolean isGBK = false; - /** - * Check if the transform executor needs to be chained into an existing ExecutorsBolt. - * For following cases, a new bolt is created for the specified executor, otherwise the executor - * will be added into the bolt contains corresponding upstream executor. - * a) it is a GroupByKey executor - * b) it is connected to source directly - * c) None existing upstream bolt was found - * d) For the purpose of performance to reduce the side effects between multiple streams which - * is output to same executor, a new bolt will be created. - */ - if (RunnerUtils.isGroupByKeyExecutor(executor)) { - bolt = new ExecutorsBolt(); - name = executionGraphContext.registerBolt(bolt); - isGBK = true; - } else if (connectedToSource()) { - bolt = new ExecutorsBolt(); - name = executionGraphContext.registerBolt(bolt); - } else { - name = getUpstreamExecutorsBolt(); - if (name == null) { - bolt = new ExecutorsBolt(); - name = executionGraphContext.registerBolt(bolt); - } else { - bolt = executionGraphContext.getBolt(name); - if (isMultipleInputOrOutput(bolt, inputs)) { - bolt = new ExecutorsBolt(); - name = executionGraphContext.registerBolt(bolt); - } - } - } - - // update the output tags of current transform into ExecutorsBolt - for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) { - TupleTag tag = entry.getKey(); - PValue value = entry.getValue(); - - // use tag of PValueBase - if (value instanceof PValueBase) { - tag = ((PValueBase) value).expand().keySet().iterator().next(); - } - executionGraphContext.registerStreamProducer( - TaggedPValue.of(tag, value), - Stream.Producer.of(name, tag.getId(), value.getName())); - //bolt.addOutputTags(tag); - } - - // add the transform executor into the chain of ExecutorsBolt - for (Map.Entry<TupleTag<?>, PValue> entry : inputs.entrySet()) { - TupleTag tag = entry.getKey(); - PValue value = entry.getValue(); - bolt.addExecutor(tag, executor); - - // filter all connections inside bolt - //if (!bolt.getOutputTags().contains(tag)) { - Stream.Grouping grouping; - if (isGBK) { - grouping = Stream.Grouping.byFields(Arrays.asList(CommonInstance.KEY)); - } else { - grouping = Stream.Grouping.of(Stream.Grouping.Type.LOCAL_OR_SHUFFLE); - } - addStormStreamDef(TaggedPValue.of(tag, value), name, grouping); - //} - } - - for (PValue sideInput : sideInputs) { - TupleTag tag = userGraphContext.findTupleTag(sideInput); - bolt.addExecutor(tag, executor); - checkState(!bolt.getOutputTags().contains(tag)); - addStormStreamDef(TaggedPValue.of(tag, sideInput), name, Stream.Grouping.of(Stream.Grouping.Type.ALL)); - } - - bolt.registerExecutor(executor); - - // set parallelismNumber - String pTransformfullName = userGraphContext.currentTransform.getFullName(); - String compositeName = pTransformfullName.split("/")[0]; - Map parallelismNumMap = userGraphContext.getOptions().getParallelismNumMap(); - if (parallelismNumMap.containsKey(compositeName)) { - int configNum = (Integer) parallelismNumMap.get(compositeName); - int currNum = bolt.getParallelismNum(); - bolt.setParallelismNum(Math.max(configNum, currNum)); - } - } - - // TODO: add getSideInputs() and getSideOutputs(). - public static class UserGraphContext { - private final StormPipelineOptions options; - private final Map<PValue, TupleTag> pValueToTupleTag; - private AppliedPTransform<?, ?, ?> currentTransform = null; - - private boolean isWindowed = false; - - public UserGraphContext(StormPipelineOptions options) { - this.options = checkNotNull(options, "options"); - this.pValueToTupleTag = Maps.newHashMap(); - } - - public StormPipelineOptions getOptions() { - return this.options; - } - - public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) { - this.currentTransform = transform; - } - - public String getStepName() { - return currentTransform.getFullName(); - } - - public <T extends PValue> T getInput() { - return (T) currentTransform.getInputs().values().iterator().next(); - } - - public Map<TupleTag<?>, PValue> getInputs() { - return currentTransform.getInputs(); - } - - public TupleTag<?> getInputTag() { - return currentTransform.getInputs().keySet().iterator().next(); - } - - public List<TupleTag<?>> getInputTags() { - return Lists.newArrayList(currentTransform.getInputs().keySet()); - } - - public <T extends PValue> T getOutput() { - return (T) currentTransform.getOutputs().values().iterator().next(); - } - - public Map<TupleTag<?>, PValue> getOutputs() { - return currentTransform.getOutputs(); - } - - public TupleTag<?> getOutputTag() { - return currentTransform.getOutputs().keySet().iterator().next(); - } - - public List<TupleTag<?>> getOutputTags() { - return Lists.newArrayList(currentTransform.getOutputs().keySet()); - } - - public void recordOutputTaggedPValue() { - for (Map.Entry<TupleTag<?>, PValue> entry : getOutputs().entrySet()) { - pValueToTupleTag.put(entry.getValue(), entry.getKey()); - } - } - - public <T> TupleTag<T> findTupleTag(PValue pValue) { - return pValueToTupleTag.get(checkNotNull(pValue, "pValue")); - } - - public void setWindowed() { - this.isWindowed = true; - } - - public boolean isWindowed() { - return this.isWindowed; - } - - @Override - public String toString() { - return Joiner.on('\n').join(FluentIterable.from(pValueToTupleTag.entrySet()) - .transform(new Function<Map.Entry<PValue,TupleTag>, String>() { - @Override - public String apply(Map.Entry<PValue, TupleTag> entry) { - return String.format("%s == %s", entry.getValue().getId(), entry.getKey().getName()); - }})); - } - } - - public static class ExecutionGraphContext { - - private final Map<String, AdaptorBasicSpout> spoutMap = new HashMap<>(); - private final Map<String, ExecutorsBolt> boltMap = new HashMap<>(); - - // One-to-one mapping between Stream.Producer and TaggedPValue (or PValue). - private final Map<PValue, Stream.Producer> pValueToProducer = new HashMap<>(); - private final Map<Stream.Producer, TaggedPValue> producerToTaggedPValue = new HashMap<>(); - - private final List<Stream> streams = new ArrayList<>(); - - private int id = 1; - - public void registerSpout(AdaptorBasicSpout spout, TaggedPValue output) { - checkNotNull(spout, "spout"); - checkNotNull(output, "output"); - String name = "spout" + genId(); - this.spoutMap.put(name, spout); - registerStreamProducer( - output, - Stream.Producer.of(name, output.getTag().getId(), output.getValue().getName())); - } - - public AdaptorBasicSpout getSpout(String id) { - if (Strings.isNullOrEmpty(id)) { - return null; - } - return this.spoutMap.get(id); - } - - public Map<String, AdaptorBasicSpout> getSpouts() { - return this.spoutMap; - } - - public String registerBolt(ExecutorsBolt bolt) { - checkNotNull(bolt, "bolt"); - String name = "bolt" + genId(); - this.boltMap.put(name, bolt); - return name; - } - - public ExecutorsBolt getBolt(String id) { - if (Strings.isNullOrEmpty(id)) { - return null; - } - return this.boltMap.get(id); - } - - public void registerStreamProducer(TaggedPValue taggedPValue, Stream.Producer producer) { - checkNotNull(taggedPValue, "taggedPValue"); - checkNotNull(producer, "producer"); - pValueToProducer.put(taggedPValue.getValue(), producer); - producerToTaggedPValue.put(producer, taggedPValue); - } - - public Stream.Producer getProducer(PValue pValue) { - return pValueToProducer.get(checkNotNull(pValue, "pValue")); - } - - public String getProducerComponentId(PValue pValue) { - Stream.Producer producer = getProducer(pValue); - return producer == null ? null : producer.getComponentId(); - } - - public boolean producedBySpout(PValue pValue) { - String componentId = getProducerComponentId(pValue); - return getSpout(componentId) != null; - } - - public void registerStreamConsumer(Stream.Consumer consumer, Stream.Producer producer) { - streams.add(Stream.of( - checkNotNull(producer, "producer"), - checkNotNull(consumer, "consumer"))); - } - - public Map<PValue, Stream.Producer> getPValueToProducers() { - return pValueToProducer; - } - - public Iterable<Stream> getStreams() { - return streams; - } - - @Override - public String toString() { - List<String> ret = new ArrayList<>(); - ret.add("SPOUT"); - for (Map.Entry<String, AdaptorBasicSpout> entry : spoutMap.entrySet()) { - ret.add(entry.getKey() + ": " + entry.getValue().toString()); - } - ret.add("BOLT"); - for (Map.Entry<String, ExecutorsBolt> entry : boltMap.entrySet()) { - ret.add(entry.getKey() + ": " + entry.getValue().toString()); - } - ret.add("STREAM"); - for (Stream stream : streams) { - ret.add(String.format( - "%s@@%s ---> %s@@%s", - stream.getProducer().getStreamId(), - stream.getProducer().getComponentId(), - stream.getConsumer().getGrouping(), - stream.getConsumer().getComponentId())); - } - return Joiner.on("\n").join(ret); - } - - private synchronized int genId() { - return id++; - } - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslatorRegistry.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslatorRegistry.java deleted file mode 100644 index 5e92eea..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslatorRegistry.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation; - -import com.alibaba.jstorm.beam.translation.translator.*; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.Map; - -/** - * Lookup table mapping PTransform types to associated TransformTranslator implementations. - */ -public class TranslatorRegistry { - private static final Logger LOG = LoggerFactory.getLogger(TranslatorRegistry.class); - - private static final Map<Class<? extends PTransform>, TransformTranslator> TRANSLATORS = new HashMap<>(); - - static { - TRANSLATORS.put(Read.Bounded.class, new BoundedSourceTranslator()); - TRANSLATORS.put(Read.Unbounded.class, new UnboundedSourceTranslator()); - // TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator()); - // TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator()); - - TRANSLATORS.put(ParDo.SingleOutput.class, new ParDoBoundTranslator()); - TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoBoundMultiTranslator()); - - //TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator<>()); - TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator<>()); - - TRANSLATORS.put(Flatten.PCollections.class, new FlattenTranslator()); - - TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator()); - - TRANSLATORS.put(ViewTranslator.CreateJStormPCollectionView.class, new ViewTranslator()); - - /** - * Currently, empty translation is required for combine and reshuffle. Because, the transforms will be - * mapped to GroupByKey and Pardo finally. So we only need to translator the finally transforms. - * If any improvement is required, the composite transforms will be translated in the future. - */ - // TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator()); - // TRANSLATORS.put(Globally.class, new CombineGloballyTranslator()); - // TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslator()); - } - - public static TransformTranslator<?> getTranslator(PTransform<?, ?> transform) { - TransformTranslator<?> translator = TRANSLATORS.get(transform.getClass()); - if (translator == null) { - LOG.warn("Unsupported operator={}", transform.getClass().getName()); - } - return translator; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AbstractComponent.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AbstractComponent.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AbstractComponent.java deleted file mode 100644 index 876546d..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AbstractComponent.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.runtime; - -import java.util.HashMap; -import java.util.Map; - -import com.alibaba.jstorm.beam.translation.util.CommonInstance; - -import backtype.storm.topology.IComponent; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.utils.Utils; - -/* - * Enable user to add output stream definitions by API, rather than hard-code. - */ -public abstract class AbstractComponent implements IComponent { - private Map<String, Fields> streamToFields = new HashMap<>(); - private Map<String, Boolean> keyStreams = new HashMap<>(); - private int parallelismNum = 0; - - public void addOutputField(String streamId) { - addOutputField(streamId, new Fields(CommonInstance.VALUE)); - } - - public void addOutputField(String streamId, Fields fields) { - streamToFields.put(streamId, fields); - keyStreams.put(streamId, false); - } - - public void addKVOutputField(String streamId) { - streamToFields.put(streamId, new Fields(CommonInstance.KEY, CommonInstance.VALUE)); - keyStreams.put(streamId, true); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - for (Map.Entry<String, Fields> entry : streamToFields.entrySet()) { - declarer.declareStream(entry.getKey(), entry.getValue()); - } - } - - public boolean keyedEmit(String streamId) { - Boolean isKeyedStream = keyStreams.get(streamId); - return isKeyedStream == null ? false : isKeyedStream; - } - - public int getParallelismNum() { - return parallelismNum; - } - - public void setParallelismNum(int num) { - parallelismNum = num; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AdaptorBasicBolt.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AdaptorBasicBolt.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AdaptorBasicBolt.java deleted file mode 100644 index d1308af..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AdaptorBasicBolt.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.runtime; - -import backtype.storm.topology.IRichBatchBolt; - -public abstract class AdaptorBasicBolt extends AbstractComponent implements IRichBatchBolt { - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AdaptorBasicSpout.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AdaptorBasicSpout.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AdaptorBasicSpout.java deleted file mode 100644 index 2f77bfb..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AdaptorBasicSpout.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.runtime; - -import backtype.storm.topology.IRichSpout; - -public abstract class AdaptorBasicSpout extends AbstractComponent implements IRichSpout { - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/DoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/DoFnExecutor.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/DoFnExecutor.java deleted file mode 100644 index 9d88c4d..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/DoFnExecutor.java +++ /dev/null @@ -1,330 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.runtime; - -import java.io.Serializable; -import java.util.*; - -import avro.shaded.com.google.common.collect.Iterables; -import com.alibaba.jstorm.beam.translation.runtime.state.JStormStateInternals; -import com.alibaba.jstorm.beam.translation.runtime.timer.JStormTimerInternals; - -import com.alibaba.jstorm.cache.IKvStoreManager; -import com.alibaba.jstorm.metric.MetricClient; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import org.apache.beam.runners.core.DoFnRunner; -import org.apache.beam.runners.core.DoFnRunners; -import org.apache.beam.runners.core.DoFnRunners.OutputManager; -import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; -import org.apache.beam.runners.core.SideInputHandler; -import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner; -import org.apache.beam.runners.core.StateInternals; -import org.apache.beam.runners.core.StateTag; -import org.apache.beam.runners.core.StateTags; -import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.runners.core.StateNamespace; -import org.apache.beam.runners.core.StateNamespaces; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; -import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.runners.core.NullSideInputReader; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.WindowingStrategy; -import org.apache.beam.sdk.state.BagState; -import org.apache.beam.sdk.state.WatermarkHoldState; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.alibaba.jstorm.beam.StormPipelineOptions; -import com.alibaba.jstorm.beam.translation.util.DefaultStepContext; -import com.alibaba.jstorm.beam.util.SerializedPipelineOptions; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -public class DoFnExecutor<InputT, OutputT> implements Executor { - private static final long serialVersionUID = 5297603063991078668L; - - private static final Logger LOG = LoggerFactory.getLogger(DoFnExecutor.class); - - public class DoFnExecutorOutputManager implements OutputManager, Serializable { - private static final long serialVersionUID = -661113364735206170L; - - @Override - public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { - executorsBolt.processExecutorElem(tag, output); - } - } - - protected transient DoFnRunner<InputT, OutputT> runner = null; - protected transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackRunner = null; - - protected final String stepName; - - protected int internalDoFnExecutorId; - - protected final String description; - - protected final TupleTag<OutputT> mainTupleTag; - protected final List<TupleTag<?>> sideOutputTags; - - protected SerializedPipelineOptions serializedOptions; - protected transient StormPipelineOptions pipelineOptions; - - protected DoFn<InputT, OutputT> doFn; - protected final Coder<WindowedValue<InputT>> inputCoder; - protected DoFnInvoker<InputT, OutputT> doFnInvoker; - protected OutputManager outputManager; - protected WindowingStrategy<?, ?> windowingStrategy; - protected final TupleTag<InputT> mainInputTag; - protected Collection<PCollectionView<?>> sideInputs; - protected SideInputHandler sideInputHandler; - protected final Map<TupleTag, PCollectionView<?>> sideInputTagToView; - - // Initialize during runtime - protected ExecutorContext executorContext; - protected ExecutorsBolt executorsBolt; - protected TimerInternals timerInternals; - protected transient StateInternals pushbackStateInternals; - protected transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag; - protected transient StateTag<WatermarkHoldState> watermarkHoldTag; - protected transient IKvStoreManager kvStoreManager; - protected DefaultStepContext stepContext; - protected transient MetricClient metricClient; - - public DoFnExecutor( - String stepName, - String description, - StormPipelineOptions pipelineOptions, - DoFn<InputT, OutputT> doFn, - Coder<WindowedValue<InputT>> inputCoder, - WindowingStrategy<?, ?> windowingStrategy, - TupleTag<InputT> mainInputTag, - Collection<PCollectionView<?>> sideInputs, - Map<TupleTag, PCollectionView<?>> sideInputTagToView, - TupleTag<OutputT> mainTupleTag, - List<TupleTag<?>> sideOutputTags) { - this.stepName = checkNotNull(stepName, "stepName"); - this.description = checkNotNull(description, "description"); - this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); - this.doFn = doFn; - this.inputCoder = inputCoder; - this.outputManager = new DoFnExecutorOutputManager(); - this.windowingStrategy = windowingStrategy; - this.mainInputTag = mainInputTag; - this.sideInputs = sideInputs; - this.mainTupleTag = mainTupleTag; - this.sideOutputTags = sideOutputTags; - this.sideInputTagToView = sideInputTagToView; - } - - protected DoFnRunner<InputT, OutputT> getDoFnRunner() { - return new DoFnRunnerWithMetrics<>( - stepName, - DoFnRunners.simpleRunner( - this.pipelineOptions, - this.doFn, - this.sideInputHandler == null ? NullSideInputReader.empty() : sideInputHandler, - this.outputManager, - this.mainTupleTag, - this.sideOutputTags, - this.stepContext, - this.windowingStrategy), - MetricsReporter.create(metricClient)); - } - - protected void initService(ExecutorContext context) { - // TODO: what should be set for key in here? - timerInternals = new JStormTimerInternals(null /* key */, this, context.getExecutorsBolt().timerService()); - kvStoreManager = context.getKvStoreManager(); - stepContext = new DefaultStepContext(timerInternals, - new JStormStateInternals(null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); - metricClient = new MetricClient(executorContext.getTopologyContext()); - } - - @Override - public void init(ExecutorContext context) { - this.executorContext = context; - this.executorsBolt = context.getExecutorsBolt(); - this.pipelineOptions = this.serializedOptions.getPipelineOptions().as(StormPipelineOptions.class); - - initService(context); - - // Side inputs setup - if (sideInputs != null && sideInputs.isEmpty() == false) { - pushedBackTag = StateTags.bag("pushed-back-values", inputCoder); - watermarkHoldTag = - StateTags.watermarkStateInternal("hold", TimestampCombiner.EARLIEST); - pushbackStateInternals = new JStormStateInternals(null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId); - sideInputHandler = new SideInputHandler(sideInputs, pushbackStateInternals); - runner = getDoFnRunner(); - pushbackRunner = SimplePushbackSideInputDoFnRunner.create(runner, sideInputs, sideInputHandler); - } else { - runner = getDoFnRunner(); - } - - // Process user's setup - doFnInvoker = DoFnInvokers.invokerFor(doFn); - doFnInvoker.invokeSetup(); - } - - @Override - public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) { - LOG.debug(String.format("process: elemTag=%s, mainInputTag=%s, sideInputs=%s, elem={}", - tag, mainInputTag, sideInputs, elem.getValue())); - if (mainInputTag.equals(tag)) { - processMainInput(elem); - } else { - processSideInput(tag, elem); - } - } - - protected <T> void processMainInput(WindowedValue<T> elem) { - if (sideInputs.isEmpty()) { - runner.processElement((WindowedValue<InputT>) elem); - } else { - Iterable<WindowedValue<InputT>> justPushedBack = - pushbackRunner.processElementInReadyWindows((WindowedValue<InputT>) elem); - BagState<WindowedValue<InputT>> pushedBack = - pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); - - Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE; - for (WindowedValue<InputT> pushedBackValue : justPushedBack) { - if (pushedBackValue.getTimestamp().isBefore(min)) { - min = pushedBackValue.getTimestamp(); - } - min = earlier(min, pushedBackValue.getTimestamp()); - pushedBack.add(pushedBackValue); - } - pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag).add(min); - } - } - - protected void processSideInput(TupleTag tag, WindowedValue elem) { - LOG.debug(String.format("side inputs: %s, %s.", tag, elem)); - - PCollectionView<?> sideInputView = sideInputTagToView.get(tag); - sideInputHandler.addSideInputValue(sideInputView, elem); - - BagState<WindowedValue<InputT>> pushedBack = - pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); - - List<WindowedValue<InputT>> newPushedBack = new ArrayList<>(); - - Iterable<WindowedValue<InputT>> pushedBackInputs = pushedBack.read(); - if (pushedBackInputs != null) { - for (WindowedValue<InputT> input : pushedBackInputs) { - - Iterable<WindowedValue<InputT>> justPushedBack = - pushbackRunner.processElementInReadyWindows(input); - Iterables.addAll(newPushedBack, justPushedBack); - } - } - pushedBack.clear(); - - Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE; - for (WindowedValue<InputT> pushedBackValue : newPushedBack) { - min = earlier(min, pushedBackValue.getTimestamp()); - pushedBack.add(pushedBackValue); - } - - WatermarkHoldState watermarkHold = - pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag); - // TODO: clear-then-add is not thread-safe. - watermarkHold.clear(); - watermarkHold.add(min); - } - - /** - * Process all pushed back elements when receiving watermark with max timestamp - */ - public void processAllPushBackElements() { - if (sideInputs != null && sideInputs.isEmpty() == false) { - BagState<WindowedValue<InputT>> pushedBackElements = - pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); - if (pushedBackElements != null) { - for (WindowedValue<InputT> elem : pushedBackElements.read()) { - LOG.info("Process pushback elem={}", elem); - runner.processElement(elem); - } - pushedBackElements.clear(); - } - - WatermarkHoldState watermarkHold = - pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag); - watermarkHold.clear(); - watermarkHold.add(BoundedWindow.TIMESTAMP_MAX_VALUE); - } - } - - public void onTimer(Object key, TimerInternals.TimerData timerData) { - StateNamespace namespace = timerData.getNamespace(); - checkArgument(namespace instanceof StateNamespaces.WindowNamespace); - BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow(); - if (pushbackRunner != null) { - pushbackRunner.onTimer(timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain()); - } else { - runner.onTimer(timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain()); - } - } - - @Override - public void cleanup() { - doFnInvoker.invokeTeardown(); - } - - @Override - public String toString() { - return description; - } - - private Instant earlier(Instant left, Instant right) { - return left.isBefore(right) ? left : right; - } - - public void startBundle() { - if (pushbackRunner != null) { - pushbackRunner.startBundle(); - } else { - runner.startBundle(); - } - } - - public void finishBundle() { - if (pushbackRunner != null) { - pushbackRunner.finishBundle(); - } else { - runner.finishBundle(); - } - } - - public void setInternalDoFnExecutorId(int id) { - this.internalDoFnExecutorId = id; - } - - public int getInternalDoFnExecutorId() { - return internalDoFnExecutorId; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/DoFnRunnerWithMetrics.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/DoFnRunnerWithMetrics.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/DoFnRunnerWithMetrics.java deleted file mode 100644 index 105dffb..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/DoFnRunnerWithMetrics.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.runtime; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.Closeable; -import java.io.IOException; -import org.apache.beam.runners.core.DoFnRunner; -import org.apache.beam.sdk.metrics.MetricsContainer; -import org.apache.beam.sdk.metrics.MetricsEnvironment; -import org.apache.beam.sdk.state.TimeDomain; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.joda.time.Instant; - -/** - * DoFnRunner decorator which registers {@link MetricsContainer}. - */ -public class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { - - private final String stepName; - private final DoFnRunner<InputT, OutputT> delegate; - private final MetricsReporter metricsReporter; - - DoFnRunnerWithMetrics( - String stepName, - DoFnRunner<InputT, OutputT> delegate, - MetricsReporter metricsReporter) { - this.stepName = checkNotNull(stepName, "stepName"); - this.delegate = checkNotNull(delegate, "delegate"); - this.metricsReporter = checkNotNull(metricsReporter, "metricsReporter"); - } - - @Override - public void startBundle() { - try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer( - metricsReporter.getMetricsContainer(stepName))) { - delegate.startBundle(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void processElement(WindowedValue<InputT> elem) { - try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer( - metricsReporter.getMetricsContainer(stepName))) { - delegate.processElement(elem); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) { - try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer( - metricsReporter.getMetricsContainer(stepName))) { - delegate.onTimer(timerId, window, timestamp, timeDomain); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void finishBundle() { - try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer( - metricsReporter.getMetricsContainer(stepName))) { - delegate.finishBundle(); - } catch (IOException e) { - throw new RuntimeException(e); - } - metricsReporter.updateMetrics(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/Executor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/Executor.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/Executor.java deleted file mode 100644 index 30348b2..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/Executor.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.runtime; - -import java.io.Serializable; -import java.util.Map; - -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.TupleTag; - -import com.alibaba.jstorm.utils.Pair; - -public interface Executor extends Serializable { - /** - * Initialization during runtime - */ - void init(ExecutorContext context); - - <T> void process(TupleTag<T> tag, WindowedValue<T> elem); - - void cleanup(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ExecutorContext.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ExecutorContext.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ExecutorContext.java deleted file mode 100644 index 7f9aa77..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ExecutorContext.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.runtime; - -import backtype.storm.task.TopologyContext; -import com.alibaba.jstorm.cache.IKvStoreManager; -import com.google.auto.value.AutoValue; - -@AutoValue -public abstract class ExecutorContext { - public static ExecutorContext of(TopologyContext topologyContext, ExecutorsBolt bolt, IKvStoreManager kvStoreManager) { - return new AutoValue_ExecutorContext(topologyContext, bolt, kvStoreManager); - } - - public abstract TopologyContext getTopologyContext(); - - public abstract ExecutorsBolt getExecutorsBolt(); - - public abstract IKvStoreManager getKvStoreManager(); -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ExecutorsBolt.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ExecutorsBolt.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ExecutorsBolt.java deleted file mode 100644 index ebd9456..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ExecutorsBolt.java +++ /dev/null @@ -1,332 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.runtime; - -import java.io.IOException; -import java.util.*; - -import avro.shaded.com.google.common.base.Joiner; -import avro.shaded.com.google.common.collect.Sets; -import backtype.storm.tuple.ITupleExt; -import backtype.storm.tuple.TupleImplExt; -import com.alibaba.jstorm.beam.translation.util.CommonInstance; -import com.alibaba.jstorm.cache.IKvStoreManager; -import com.alibaba.jstorm.cache.KvStoreIterable; -import com.alibaba.jstorm.cache.KvStoreManagerFactory; -import com.alibaba.jstorm.cluster.Common; -import com.alibaba.jstorm.utils.KryoSerializer; -import com.alibaba.jstorm.window.Watermark; -import com.google.common.base.Function; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Maps; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.TupleTag; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; - -import static com.google.common.base.Preconditions.checkNotNull; - -public class ExecutorsBolt extends AdaptorBasicBolt { - private static final long serialVersionUID = -7751043327801735211L; - - private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class); - - protected ExecutorContext executorContext; - - protected TimerService timerService; - - // map from input tag to executor inside bolt - protected final Map<TupleTag, Executor> inputTagToExecutor = Maps.newHashMap(); - // set of all output tags that will be emit outside bolt - protected final Set<TupleTag> outputTags = Sets.newHashSet(); - protected final Set<TupleTag> externalOutputTags = Sets.newHashSet(); - protected final Set<DoFnExecutor> doFnExecutors = Sets.newHashSet(); - protected int internalDoFnExecutorId = 1; - protected final Map<Integer, DoFnExecutor> idToDoFnExecutor = Maps.newHashMap(); - - protected OutputCollector collector; - - protected boolean isStatefulBolt = false; - - protected KryoSerializer<WindowedValue> serializer; - - public ExecutorsBolt() { - - } - - public void setStatefulBolt(boolean isStateful) { - isStatefulBolt = isStateful; - } - - public void addExecutor(TupleTag inputTag, Executor executor) { - inputTagToExecutor.put( - checkNotNull(inputTag, "inputTag"), - checkNotNull(executor, "executor")); - } - - public Map<TupleTag, Executor> getExecutors() { - return inputTagToExecutor; - } - - public void registerExecutor(Executor executor) { - if (executor instanceof DoFnExecutor) { - DoFnExecutor doFnExecutor = (DoFnExecutor) executor; - idToDoFnExecutor.put(internalDoFnExecutorId, doFnExecutor); - doFnExecutor.setInternalDoFnExecutorId(internalDoFnExecutorId); - internalDoFnExecutorId++; - } - } - - public Map<Integer, DoFnExecutor> getIdToDoFnExecutor() { - return idToDoFnExecutor; - } - - public void addOutputTags(TupleTag tag) { - outputTags.add(tag); - } - - public void addExternalOutputTag(TupleTag<?> tag) { - externalOutputTags.add(tag); - } - - public Set<TupleTag> getOutputTags() { - return outputTags; - } - - public ExecutorContext getExecutorContext() { - return executorContext; - } - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - LOG.info("Start to prepare for task-{}", context.getThisTaskId()); - try { - this.collector = collector; - - // init kv store manager - String storeName = String.format("task-%d", context.getThisTaskId()); - String stateStorePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName); - IKvStoreManager kvStoreManager = isStatefulBolt ? KvStoreManagerFactory.getKvStoreManagerWithMonitor(context, storeName, stateStorePath, isStatefulBolt) : - KvStoreManagerFactory.getKvStoreManager(stormConf, storeName, stateStorePath, isStatefulBolt); - this.executorContext = ExecutorContext.of(context, this, kvStoreManager); - - // init time service - timerService = initTimerService(); - - // init all internal executors - for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) { - executor.init(executorContext); - if (executor instanceof DoFnExecutor) { - doFnExecutors.add((DoFnExecutor) executor); - } - } - - this.serializer = new KryoSerializer<WindowedValue>(stormConf); - - LOG.info("ExecutorsBolt finished init. LocalExecutors={}", inputTagToExecutor.values()); - LOG.info("inputTagToExecutor={}", inputTagToExecutor); - LOG.info("outputTags={}", outputTags); - LOG.info("externalOutputTags={}", externalOutputTags); - LOG.info("doFnExecutors={}", doFnExecutors); - } catch (IOException e) { - throw new RuntimeException("Failed to prepare executors bolt", e); - } - } - - public TimerService initTimerService() { - TopologyContext context = executorContext.getTopologyContext(); - List<Integer> tasks = FluentIterable.from(context.getThisSourceComponentTasks().entrySet()) - .transformAndConcat( - new Function<Map.Entry<String, List<Integer>>, Iterable<Integer>>() { - @Override - public Iterable<Integer> apply(Map.Entry<String, List<Integer>> value) { - if (Common.isSystemComponent(value.getKey())) { - return Collections.EMPTY_LIST; - } else { - return value.getValue(); - } - } - }) - .toList(); - TimerService ret = new TimerServiceImpl(executorContext); - ret.init(tasks); - return ret; - } - - @Override - public void execute(Tuple input) { - // process a batch - String streamId = input.getSourceStreamId(); - ITupleExt tuple = (ITupleExt) input; - Iterator<List<Object>> valueIterator = tuple.batchValues().iterator(); - if (CommonInstance.BEAM_WATERMARK_STREAM_ID.equals(streamId)) { - while (valueIterator.hasNext()) { - processWatermark((Long) valueIterator.next().get(0), input.getSourceTask()); - } - } else { - doFnStartBundle(); - while (valueIterator.hasNext()) { - processElement(valueIterator.next(), streamId); - } - doFnFinishBundle(); - } - } - - private void processWatermark(long watermarkTs, int sourceTask) { - long newWaterMark = timerService.updateInputWatermark(sourceTask, watermarkTs); - LOG.debug("Recv waterMark-{} from task-{}, newWaterMark={}", - (new Instant(watermarkTs)).toDateTime(), sourceTask, (new Instant(newWaterMark)).toDateTime()); - if (newWaterMark != 0) { - // Some buffer windows are going to be triggered. - doFnStartBundle(); - timerService.fireTimers(newWaterMark); - - // SideInput: If receiving water mark with max timestamp, It means no more data is supposed - // to be received from now on. So we are going to process all push back data. - if (newWaterMark == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { - for (DoFnExecutor doFnExecutor : doFnExecutors) { - doFnExecutor.processAllPushBackElements(); - } - } - - doFnFinishBundle(); - } - - long currentWaterMark = timerService.currentOutputWatermark(); - if (!externalOutputTags.isEmpty()) { - collector.flush(); - collector.emit( - CommonInstance.BEAM_WATERMARK_STREAM_ID, - new Values(currentWaterMark)); - LOG.debug("Send waterMark-{}", (new Instant(currentWaterMark)).toDateTime()); - } - } - - private void processElement(List<Object> values, String streamId) { - TupleTag inputTag = new TupleTag(streamId); - WindowedValue windowedValue = retrieveWindowedValueFromTupleValue(values); - processExecutorElem(inputTag, windowedValue); - } - - public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T> elem) { - LOG.debug("ProcessExecutorElem: inputTag={}, value={}", inputTag, elem.getValue()); - if (elem != null) { - Executor executor = inputTagToExecutor.get(inputTag); - if (executor != null) { - executor.process(inputTag, elem); - } - if (externalOutputTags.contains(inputTag)) { - emitOutsideBolt(inputTag, elem); - } - } else { - LOG.info("Received null elem for tag={}", inputTag); - } - } - - @Override - public void cleanup() { - for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) { - executor.cleanup(); - } - executorContext.getKvStoreManager().close(); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } - - public TimerService timerService() { - return timerService; - } - - public void setTimerService(TimerService service) { - timerService = service; - } - - private WindowedValue retrieveWindowedValueFromTupleValue(List<Object> values) { - WindowedValue wv = null; - if (values.size() > 1) { - Object key = values.get(0); - WindowedValue value = serializer.deserialize((byte[]) values.get(1)); - wv = value.withValue(KV.of(key, value.getValue())); - } else { - wv = serializer.deserialize((byte[])values.get(0)); - } - return wv; - } - - protected void emitOutsideBolt(TupleTag outputTag, WindowedValue outputValue) { - LOG.debug("Output outside: tag={}, value={}", outputTag, outputValue.getValue()); - if (keyedEmit(outputTag.getId())) { - KV kv = (KV) outputValue.getValue(); - byte[] immutableOutputValue = serializer.serialize(outputValue.withValue(kv.getValue())); - // Convert WindowedValue<KV> to <K, WindowedValue<V>> - if (kv.getKey() == null) { - // If key is null, emit "null" string here. Because, null value will be ignored in JStorm. - collector.emit(outputTag.getId(), new Values("null", immutableOutputValue)); - } else { - collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableOutputValue)); - } - } else { - byte[] immutableOutputValue = serializer.serialize(outputValue); - collector.emit(outputTag.getId(), new Values(immutableOutputValue)); - } - } - - private void doFnStartBundle() { - for (DoFnExecutor doFnExecutor : doFnExecutors) { - doFnExecutor.startBundle(); - } - } - - private void doFnFinishBundle() { - for (DoFnExecutor doFnExecutor : doFnExecutors) { - doFnExecutor.finishBundle(); - } - } - - @Override - public String toString() { - // LOG.info("bolt: " + executorContext.getTopologyContext().toJSONString()); - List<String> ret = new ArrayList<>(); - /*ret.add("inputTags"); - for (TupleTag inputTag : inputTagToExecutor.keySet()) { - ret.add(inputTag.getId()); - }*/ - ret.add("internalExecutors"); - for (Executor executor : inputTagToExecutor.values()) { - ret.add(executor.toString()); - } - ret.add("externalOutputTags"); - for (TupleTag output : externalOutputTags) { - ret.add(output.getId()); - } - return Joiner.on('\n').join(ret).concat("\n"); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/FlattenExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/FlattenExecutor.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/FlattenExecutor.java deleted file mode 100644 index 7158b2f..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/FlattenExecutor.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.runtime; - -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.TupleTag; - -import static com.google.common.base.Preconditions.checkNotNull; - -public class FlattenExecutor<InputT> implements Executor { - - private final String description; - private TupleTag mainOutputTag; - private ExecutorContext context; - private ExecutorsBolt executorsBolt; - - public FlattenExecutor(String description, TupleTag mainTupleTag) { - this.description = checkNotNull(description, "description"); - this.mainOutputTag = mainTupleTag; - } - - @Override - public void init(ExecutorContext context) { - this.context = context; - this.executorsBolt = context.getExecutorsBolt(); - } - - @Override - public void process(TupleTag tag, WindowedValue elem) { - executorsBolt.processExecutorElem(mainOutputTag, elem); - } - - @Override - public void cleanup() {} - - @Override - public String toString() { - return description; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/GroupByWindowExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/GroupByWindowExecutor.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/GroupByWindowExecutor.java deleted file mode 100644 index 1958c77..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/GroupByWindowExecutor.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.runtime; - -import java.io.Serializable; -import java.util.List; - -import com.alibaba.jstorm.beam.translation.runtime.state.JStormStateInternals; -import com.alibaba.jstorm.beam.translation.runtime.timer.JStormTimerInternals; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import org.apache.beam.runners.core.DoFnRunner; -import org.apache.beam.runners.core.DoFnRunners; -import org.apache.beam.runners.core.ExecutionContext.StepContext; -import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn; -import org.apache.beam.runners.core.KeyedWorkItem; -import org.apache.beam.runners.core.KeyedWorkItems; -import org.apache.beam.runners.core.StateInternals; -import org.apache.beam.runners.core.StateInternalsFactory; -import org.apache.beam.runners.core.StateNamespace; -import org.apache.beam.runners.core.StateNamespaces; -import org.apache.beam.runners.core.SystemReduceFn; -import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.runners.core.TimerInternalsFactory; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.runners.core.NullSideInputReader; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.WindowingStrategy; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TupleTag; - -import com.alibaba.jstorm.beam.StormPipelineOptions; -import com.alibaba.jstorm.beam.translation.TranslationContext; -import com.alibaba.jstorm.beam.translation.TranslationContext.UserGraphContext; -import com.alibaba.jstorm.beam.translation.util.DefaultStepContext; -import com.alibaba.jstorm.beam.util.RunnerUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static com.google.common.base.Preconditions.checkArgument; - -public class GroupByWindowExecutor<K, V> extends DoFnExecutor<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> { - private static final long serialVersionUID = -7563050475488610553L; - - private static final Logger LOG = LoggerFactory.getLogger(GroupByWindowExecutor.class); - - private class GroupByWindowOutputManager implements DoFnRunners.OutputManager, Serializable { - - @Override - public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { - executorsBolt.processExecutorElem(tag, output); - } - } - - private KvCoder<K, V> inputKvCoder; - private SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn; - - public GroupByWindowExecutor( - String stepName, - String description, - TranslationContext context, - StormPipelineOptions pipelineOptions, - WindowingStrategy<?, ?> windowingStrategy, - TupleTag<KV<K, Iterable<V>>> mainTupleTag, List<TupleTag<?>> sideOutputTags) { - // The doFn will be created when runtime. Just pass "null" here - super(stepName, description, pipelineOptions, null, null, windowingStrategy, null, null, null, mainTupleTag, sideOutputTags); - - this.outputManager = new GroupByWindowOutputManager(); - UserGraphContext userGraphContext = context.getUserGraphContext(); - PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) userGraphContext.getInput(); - this.inputKvCoder = (KvCoder<K, V>) input.getCoder(); - } - - private DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getGroupByWindowDoFn() { - final StateInternalsFactory<K> stateFactory = new StateInternalsFactory<K>() { - @Override - public StateInternals stateInternalsForKey(K key) { - return new JStormStateInternals<K>(key, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId); - } - }; - TimerInternalsFactory<K> timerFactory = new TimerInternalsFactory<K>() { - @Override - public TimerInternals timerInternalsForKey(K key) { - return new JStormTimerInternals<>(key, GroupByWindowExecutor.this, executorContext.getExecutorsBolt().timerService()); - } - }; - - reduceFn = SystemReduceFn.buffering(inputKvCoder.getValueCoder()); - DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFn = - GroupAlsoByWindowViaWindowSetNewDoFn.create( - windowingStrategy, stateFactory, timerFactory, NullSideInputReader.empty(), - (SystemReduceFn) reduceFn, outputManager, mainTupleTag); - return doFn; - } - - @Override - protected DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getDoFnRunner() { - doFn = getGroupByWindowDoFn(); - - DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> simpleRunner = DoFnRunners.<KeyedWorkItem<K, V>, KV<K, Iterable<V>>>simpleRunner( - this.pipelineOptions, - this.doFn, - NullSideInputReader.empty(), - this.outputManager, - this.mainTupleTag, - this.sideOutputTags, - this.stepContext, - this.windowingStrategy); - - DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFnRunner = DoFnRunners.lateDataDroppingRunner( - simpleRunner, - this.stepContext, - this.windowingStrategy); - return new DoFnRunnerWithMetrics<>( - stepName, doFnRunner, MetricsReporter.create(metricClient)); - } - - @Override - public void process(TupleTag tag, WindowedValue elem) { - /** - * For GroupByKey, KV type elem is received. We need to convert the KV elem - * into KeyedWorkItem first, which is the expected type in LateDataDroppingDoFnRunner. - */ - KeyedWorkItem<K, V> keyedWorkItem = RunnerUtils.toKeyedWorkItem((WindowedValue<KV<K, V>>) elem); - runner.processElement(elem.withValue(keyedWorkItem)); - } - - @Override - public void onTimer(Object key, TimerInternals.TimerData timerData) { - StateNamespace namespace = timerData.getNamespace(); - checkArgument(namespace instanceof StateNamespaces.WindowNamespace); - - runner.processElement( - WindowedValue.valueInGlobalWindow( - KeyedWorkItems.<K, V>timersWorkItem((K) key, ImmutableList.of(timerData)))); - } - - @Override - public String toString() { - return super.toString(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MetricsReporter.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MetricsReporter.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MetricsReporter.java deleted file mode 100644 index 33095b1..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MetricsReporter.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.runtime; - -import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; - -import com.alibaba.jstorm.common.metric.AsmCounter; -import com.alibaba.jstorm.common.metric.AsmGauge; -import com.alibaba.jstorm.common.metric.AsmHistogram; -import com.alibaba.jstorm.common.metric.AsmMeter; -import com.alibaba.jstorm.common.metric.AsmMetric; -import com.alibaba.jstorm.metric.MetricClient; -import com.google.common.collect.Maps; -import java.util.Map; -import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; -import org.apache.beam.sdk.metrics.DistributionResult; -import org.apache.beam.sdk.metrics.GaugeResult; -import org.apache.beam.sdk.metrics.MetricQueryResults; -import org.apache.beam.sdk.metrics.MetricResult; -import org.apache.beam.sdk.metrics.MetricResults; -import org.apache.beam.sdk.metrics.MetricsContainer; -import org.apache.beam.sdk.metrics.MetricsFilter; - -/** - * Class that holds a {@link MetricsContainerStepMap}, and reports metrics to JStorm engine. - */ -public class MetricsReporter { - - private static final String METRIC_KEY_SEPARATOR = "__"; - private static final String COUNTER_PREFIX = "__counter"; - - private final MetricsContainerStepMap metricsContainers = new MetricsContainerStepMap(); - private final Map<String, Long> reportedCounters = Maps.newHashMap(); - private final MetricClient metricClient; - - public static MetricsReporter create(MetricClient metricClient) { - return new MetricsReporter(metricClient); - } - - private MetricsReporter(MetricClient metricClient) { - this.metricClient = checkNotNull(metricClient, "metricClient"); - } - - public MetricsContainer getMetricsContainer(String stepName) { - return metricsContainers.getContainer(stepName); - } - - public void updateMetrics() { - MetricResults metricResults = asAttemptedOnlyMetricResults(metricsContainers); - MetricQueryResults metricQueryResults = - metricResults.queryMetrics(MetricsFilter.builder().build()); - updateCounters(metricQueryResults.counters()); - } - - private void updateCounters(Iterable<MetricResult<Long>> counters) { - System.out.print("updateCounters"); - for (MetricResult<Long> metricResult : counters) { - String metricName = getMetricNameString(COUNTER_PREFIX, metricResult); - System.out.print("metricName: " + metricName); - Long updateValue = metricResult.attempted(); - Long oldValue = reportedCounters.get(metricName); - - if (oldValue == null || oldValue < updateValue) { - AsmCounter counter = metricClient.registerCounter(metricName); - Long incValue = (oldValue == null ? updateValue : updateValue - oldValue); - counter.update(incValue); - } - } - } - - private String getMetricNameString(String prefix, MetricResult<?> metricResult) { - return prefix - + METRIC_KEY_SEPARATOR + metricResult.step() - + METRIC_KEY_SEPARATOR + metricResult.name().namespace() - + METRIC_KEY_SEPARATOR + metricResult.name().name(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MultiOutputDoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MultiOutputDoFnExecutor.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MultiOutputDoFnExecutor.java deleted file mode 100644 index bd3dfb3..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MultiOutputDoFnExecutor.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.runtime; - -import com.alibaba.jstorm.beam.StormPipelineOptions; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; -import java.util.List; -import java.util.Map; - -public class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, OutputT> { - private static final Logger LOG = LoggerFactory.getLogger(MultiOutputDoFnExecutor.class); - - /** - * For multi-output scenario,a "local" tuple tag is used in producer currently while a generated tag - * is used in downstream consumer. So before output, we need to map this "local" tag to "external" - * tag. See PCollectionTuple for details. - */ - public class MultiOutputDoFnExecutorOutputManager extends DoFnExecutorOutputManager { - @Override - public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { - if (localTupleTagMap.containsKey(tag)) { - executorsBolt.processExecutorElem((TupleTag<T>) localTupleTagMap.get(tag), output); - } else { - executorsBolt.processExecutorElem(tag, output); - } - } - } - - protected Map<TupleTag<?>, TupleTag<?>> localTupleTagMap; - - public MultiOutputDoFnExecutor( - String stepName, - String description, - StormPipelineOptions pipelineOptions, - DoFn<InputT, OutputT> doFn, - Coder<WindowedValue<InputT>> inputCoder, - WindowingStrategy<?, ?> windowingStrategy, - TupleTag<InputT> mainInputTag, - Collection<PCollectionView<?>> sideInputs, - Map<TupleTag, PCollectionView<?>> sideInputTagToView, - TupleTag<OutputT> mainTupleTag, - List<TupleTag<?>> sideOutputTags, - Map<TupleTag<?>, TupleTag<?>> localTupleTagMap - ) { - super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag, - sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags); - this.localTupleTagMap = localTupleTagMap; - this.outputManager = new MultiOutputDoFnExecutorOutputManager(); - LOG.info("localTupleTagMap: {}", localTupleTagMap); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MultiStatefulDoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MultiStatefulDoFnExecutor.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MultiStatefulDoFnExecutor.java deleted file mode 100644 index 51aa960..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MultiStatefulDoFnExecutor.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.runtime; - -import com.alibaba.jstorm.beam.StormPipelineOptions; -import com.alibaba.jstorm.beam.translation.runtime.state.JStormStateInternals; -import com.alibaba.jstorm.beam.translation.runtime.timer.JStormTimerInternals; -import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; - -import java.util.Collection; -import java.util.List; -import java.util.Map; - -public class MultiStatefulDoFnExecutor<OutputT> extends MultiOutputDoFnExecutor<KV, OutputT> { - - public MultiStatefulDoFnExecutor( - String stepName, String description, - StormPipelineOptions pipelineOptions, DoFn<KV, OutputT> doFn, - Coder<WindowedValue<KV>> inputCoder, WindowingStrategy<?, ?> windowingStrategy, - TupleTag<KV> mainInputTag, Collection<PCollectionView<?>> sideInputs, - Map<TupleTag, PCollectionView<?>> sideInputTagToView, TupleTag<OutputT> mainTupleTag, - List<TupleTag<?>> sideOutputTags, Map<TupleTag<?>, TupleTag<?>> localTupleTagMap) { - super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags, localTupleTagMap); - } - - @Override - public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) { - if (mainInputTag.equals(tag)) { - WindowedValue<KV> kvElem = (WindowedValue<KV>) elem; - stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this, - executorContext.getExecutorsBolt().timerService())); - stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(), - kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); - processMainInput(elem); - } else { - processSideInput(tag, elem); - } - } - - @Override - public void onTimer(Object key, TimerInternals.TimerData timerData) { - stepContext.setStateInternals(new JStormStateInternals<>(key, - kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); - super.onTimer(key, timerData); - } -}
