jstorm-runner: move most classes to translation package and reduece their visibility to package private.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/82653534 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/82653534 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/82653534 Branch: refs/heads/jstorm-runner Commit: 82653534b0b738ee84ed94a67f9344393778d033 Parents: 9309ac4 Author: Pei He <[email protected]> Authored: Fri Jul 14 15:28:53 2017 +0800 Committer: Pei He <[email protected]> Committed: Sat Aug 19 12:02:57 2017 +0800 ---------------------------------------------------------------------- .../beam/runners/jstorm/JStormRunner.java | 14 +- .../jstorm/serialization/package-info.java | 22 ++ .../jstorm/translation/AbstractComponent.java | 67 ++++ .../translation/BoundedSourceTranslator.java | 49 +++ .../jstorm/translation/CommonInstance.java | 28 ++ .../jstorm/translation/DefaultStepContext.java | 90 +++++ .../jstorm/translation/DoFnExecutor.java | 339 +++++++++++++++++ .../translation/DoFnRunnerWithMetrics.java | 91 +++++ .../runners/jstorm/translation/Executor.java | 36 ++ .../jstorm/translation/ExecutorContext.java | 41 ++ .../jstorm/translation/ExecutorsBolt.java | 338 +++++++++++++++++ .../jstorm/translation/FlattenExecutor.java | 60 +++ .../jstorm/translation/FlattenTranslator.java | 49 +++ .../translation/GroupByKeyTranslator.java | 71 ++++ .../translation/GroupByWindowExecutor.java | 173 +++++++++ .../jstorm/translation/JStormBagState.java | 180 +++++++++ .../translation/JStormCombiningState.java | 88 +++++ .../jstorm/translation/JStormMapState.java | 158 ++++++++ .../translation/JStormPipelineTranslator.java | 2 - .../translation/JStormStateInternals.java | 190 ++++++++++ .../translation/JStormTimerInternals.java | 97 +++++ .../jstorm/translation/JStormValueState.java | 82 ++++ .../translation/JStormWatermarkHoldState.java | 82 ++++ .../jstorm/translation/MetricsReporter.java | 87 +++++ .../translation/MultiOutputDoFnExecutor.java | 79 ++++ .../translation/MultiStatefulDoFnExecutor.java | 70 ++++ .../translation/ParDoBoundMultiTranslator.java | 114 ++++++ .../translation/ParDoBoundTranslator.java | 107 ++++++ .../runners/jstorm/translation/RunnerUtils.java | 51 +++ .../translation/SerializedPipelineOptions.java | 65 ++++ .../translation/SingletonKeyedWorkItem.java | 62 +++ .../translation/StatefulDoFnExecutor.java | 68 ++++ .../beam/runners/jstorm/translation/Stream.java | 104 +++++ .../jstorm/translation/TimerService.java | 51 +++ .../jstorm/translation/TimerServiceImpl.java | 155 ++++++++ .../jstorm/translation/TransformTranslator.java | 79 ++++ .../jstorm/translation/TranslationContext.java | 6 - .../jstorm/translation/TranslatorRegistry.java | 11 +- .../jstorm/translation/TxExecutorsBolt.java | 133 +++++++ .../translation/TxUnboundedSourceSpout.java | 156 ++++++++ .../translation/UnboundedSourceSpout.java | 189 +++++++++ .../translation/UnboundedSourceTranslator.java | 44 +++ .../jstorm/translation/ViewExecutor.java | 56 +++ .../jstorm/translation/ViewTranslator.java | 378 ++++++++++++++++++ .../translation/WindowAssignExecutor.java | 112 ++++++ .../translation/WindowAssignTranslator.java | 41 ++ .../jstorm/translation/package-info.java | 22 ++ .../translation/runtime/AbstractComponent.java | 68 ---- .../translation/runtime/DoFnExecutor.java | 343 ----------------- .../runtime/DoFnRunnerWithMetrics.java | 91 ----- .../jstorm/translation/runtime/Executor.java | 36 -- .../translation/runtime/ExecutorContext.java | 41 -- .../translation/runtime/ExecutorsBolt.java | 339 ----------------- .../translation/runtime/FlattenExecutor.java | 60 --- .../runtime/GroupByWindowExecutor.java | 177 --------- .../translation/runtime/MetricsReporter.java | 87 ----- .../runtime/MultiOutputDoFnExecutor.java | 79 ---- .../runtime/MultiStatefulDoFnExecutor.java | 72 ---- .../runtime/StatefulDoFnExecutor.java | 70 ---- .../translation/runtime/TimerService.java | 51 --- .../translation/runtime/TimerServiceImpl.java | 155 -------- .../translation/runtime/TxExecutorsBolt.java | 133 ------- .../runtime/TxUnboundedSourceSpout.java | 156 -------- .../runtime/UnboundedSourceSpout.java | 191 ---------- .../translation/runtime/ViewExecutor.java | 56 --- .../runtime/WindowAssignExecutor.java | 112 ------ .../runtime/state/JStormBagState.java | 180 --------- .../runtime/state/JStormCombiningState.java | 88 ----- .../runtime/state/JStormMapState.java | 158 -------- .../runtime/state/JStormStateInternals.java | 191 ---------- .../runtime/state/JStormValueState.java | 82 ---- .../runtime/state/JStormWatermarkHoldState.java | 83 ---- .../runtime/timer/JStormTimerInternals.java | 100 ----- .../translator/BoundedSourceTranslator.java | 51 --- .../translator/FlattenTranslator.java | 51 --- .../translator/GroupByKeyTranslator.java | 73 ---- .../translator/ParDoBoundMultiTranslator.java | 118 ------ .../translator/ParDoBoundTranslator.java | 110 ------ .../jstorm/translation/translator/Stream.java | 104 ----- .../translator/TransformTranslator.java | 80 ---- .../translator/UnboundedSourceTranslator.java | 46 --- .../translation/translator/ViewTranslator.java | 380 ------------------- .../translator/WindowAssignTranslator.java | 43 --- .../jstorm/translation/util/CommonInstance.java | 28 -- .../translation/util/DefaultStepContext.java | 90 ----- .../beam/runners/jstorm/util/RunnerUtils.java | 55 --- .../jstorm/util/SerializedPipelineOptions.java | 65 ---- .../jstorm/util/SingletonKeyedWorkItem.java | 62 --- .../translation/JStormStateInternalsTest.java | 221 +++++++++++ .../runtime/state/JStormStateInternalsTest.java | 222 ----------- 90 files changed, 4783 insertions(+), 4802 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java index 8782130..baf4e5a 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java @@ -38,15 +38,15 @@ import org.apache.beam.runners.jstorm.serialization.KvStoreIterableSerializer; import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuListSerializer; import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuSetSerializer; import org.apache.beam.runners.jstorm.serialization.UnmodifiableCollectionsSerializer; +import org.apache.beam.runners.jstorm.translation.AbstractComponent; +import org.apache.beam.runners.jstorm.translation.CommonInstance; +import org.apache.beam.runners.jstorm.translation.ExecutorsBolt; import org.apache.beam.runners.jstorm.translation.JStormPipelineTranslator; +import org.apache.beam.runners.jstorm.translation.Stream; import org.apache.beam.runners.jstorm.translation.TranslationContext; -import org.apache.beam.runners.jstorm.translation.runtime.AbstractComponent; -import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt; -import org.apache.beam.runners.jstorm.translation.runtime.TxExecutorsBolt; -import org.apache.beam.runners.jstorm.translation.runtime.TxUnboundedSourceSpout; -import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout; -import org.apache.beam.runners.jstorm.translation.translator.Stream; -import org.apache.beam.runners.jstorm.translation.util.CommonInstance; +import org.apache.beam.runners.jstorm.translation.TxExecutorsBolt; +import org.apache.beam.runners.jstorm.translation.TxUnboundedSourceSpout; +import org.apache.beam.runners.jstorm.translation.UnboundedSourceSpout; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.options.PipelineOptions; http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/package-info.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/package-info.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/package-info.java new file mode 100644 index 0000000..f5ac931 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementation of kryo serializers. + */ +package org.apache.beam.runners.jstorm.serialization; http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/AbstractComponent.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/AbstractComponent.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/AbstractComponent.java new file mode 100644 index 0000000..35ae88d --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/AbstractComponent.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import backtype.storm.topology.IComponent; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import java.util.HashMap; +import java.util.Map; + +/** + * Enable user to add output stream definitions by API, rather than hard-code. + */ +public abstract class AbstractComponent implements IComponent { + private Map<String, Fields> streamToFields = new HashMap<>(); + private Map<String, Boolean> keyStreams = new HashMap<>(); + private int parallelismNum = 0; + + public void addOutputField(String streamId) { + addOutputField(streamId, new Fields(CommonInstance.VALUE)); + } + + public void addOutputField(String streamId, Fields fields) { + streamToFields.put(streamId, fields); + keyStreams.put(streamId, false); + } + + public void addKVOutputField(String streamId) { + streamToFields.put(streamId, new Fields(CommonInstance.KEY, CommonInstance.VALUE)); + keyStreams.put(streamId, true); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + for (Map.Entry<String, Fields> entry : streamToFields.entrySet()) { + declarer.declareStream(entry.getKey(), entry.getValue()); + } + } + + public boolean keyedEmit(String streamId) { + Boolean isKeyedStream = keyStreams.get(streamId); + return isKeyedStream == null ? false : isKeyedStream; + } + + public int getParallelismNum() { + return parallelismNum; + } + + public void setParallelismNum(int num) { + parallelismNum = num; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java new file mode 100644 index 0000000..f64193e --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Translates a {@link Read.Bounded} into a Storm spout. + * + * @param <T> + */ +class BoundedSourceTranslator<T> extends TransformTranslator.Default<Read.Bounded<T>> { + + @Override + public void translateNode(Read.Bounded<T> transform, TranslationContext context) { + TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + String description = + describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); + + TupleTag<?> outputTag = userGraphContext.getOutputTag(); + PValue outputValue = userGraphContext.getOutput(); + UnboundedSourceSpout spout = new UnboundedSourceSpout( + description, + new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(transform.getSource()), + userGraphContext.getOptions(), outputTag); + + context.getExecutionGraphContext().registerSpout( + spout, TaggedPValue.of(outputTag, outputValue)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/CommonInstance.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/CommonInstance.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/CommonInstance.java new file mode 100644 index 0000000..b7154cd --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/CommonInstance.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +/** + * Common definition of JStorm runner. + */ +public class CommonInstance { + public static final String KEY = "Key"; + public static final String VALUE = "Value"; + + public static final String BEAM_WATERMARK_STREAM_ID = "BEAM_WATERMARK"; +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DefaultStepContext.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DefaultStepContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DefaultStepContext.java new file mode 100644 index 0000000..9fd584b --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DefaultStepContext.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import org.apache.beam.runners.core.ExecutionContext; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Default StepContext for running DoFn This does not allow accessing state or timer internals. + */ +class DefaultStepContext implements ExecutionContext.StepContext { + + private TimerInternals timerInternals; + + private StateInternals stateInternals; + + public DefaultStepContext(TimerInternals timerInternals, StateInternals stateInternals) { + this.timerInternals = checkNotNull(timerInternals, "timerInternals"); + this.stateInternals = checkNotNull(stateInternals, "stateInternals"); + } + + @Override + public String getStepName() { + return null; + } + + @Override + public String getTransformName() { + return null; + } + + @Override + public void noteOutput(WindowedValue<?> windowedValue) { + + } + + @Override + public void noteOutput(TupleTag<?> tupleTag, WindowedValue<?> windowedValue) { + + } + + @Override + public <T, W extends BoundedWindow> void writePCollectionViewData( + TupleTag<?> tag, Iterable<WindowedValue<T>> data, + Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder) + throws IOException { + throw new UnsupportedOperationException("Writing side-input data is not supported."); + } + + @Override + public StateInternals stateInternals() { + return stateInternals; + } + + @Override + public TimerInternals timerInternals() { + return timerInternals; + } + + public void setStateInternals(StateInternals stateInternals) { + this.stateInternals = stateInternals; + } + + public void setTimerInternals(TimerInternals timerInternals) { + this.timerInternals = timerInternals; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java new file mode 100644 index 0000000..fdd9af6 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.alibaba.jstorm.cache.IKvStoreManager; +import com.alibaba.jstorm.metric.MetricClient; +import com.google.common.collect.Iterables; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.DoFnRunners.OutputManager; +import org.apache.beam.runners.core.NullSideInputReader; +import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; +import org.apache.beam.runners.core.SideInputHandler; +import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.StateTags; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.jstorm.JStormPipelineOptions; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.WatermarkHoldState; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JStorm {@link Executor} for {@link DoFn}. + * @param <InputT> input type + * @param <OutputT> output type + */ +class DoFnExecutor<InputT, OutputT> implements Executor { + private static final long serialVersionUID = 5297603063991078668L; + + private static final Logger LOG = LoggerFactory.getLogger(DoFnExecutor.class); + + /** + * Implements {@link OutputManager} in a DoFn executor. + */ + public class DoFnExecutorOutputManager implements OutputManager, Serializable { + private static final long serialVersionUID = -661113364735206170L; + + @Override + public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { + executorsBolt.processExecutorElem(tag, output); + } + } + + protected transient DoFnRunner<InputT, OutputT> runner = null; + protected transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackRunner = null; + + protected final String stepName; + + protected int internalDoFnExecutorId; + + protected final String description; + + protected final TupleTag<OutputT> mainTupleTag; + protected final List<TupleTag<?>> sideOutputTags; + + protected SerializedPipelineOptions serializedOptions; + protected transient JStormPipelineOptions pipelineOptions; + + protected DoFn<InputT, OutputT> doFn; + protected final Coder<WindowedValue<InputT>> inputCoder; + protected DoFnInvoker<InputT, OutputT> doFnInvoker; + protected OutputManager outputManager; + protected WindowingStrategy<?, ?> windowingStrategy; + protected final TupleTag<InputT> mainInputTag; + protected Collection<PCollectionView<?>> sideInputs; + protected SideInputHandler sideInputHandler; + protected final Map<TupleTag, PCollectionView<?>> sideInputTagToView; + + // Initialize during runtime + protected ExecutorContext executorContext; + protected ExecutorsBolt executorsBolt; + protected TimerInternals timerInternals; + protected transient StateInternals pushbackStateInternals; + protected transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag; + protected transient StateTag<WatermarkHoldState> watermarkHoldTag; + protected transient IKvStoreManager kvStoreManager; + protected DefaultStepContext stepContext; + protected transient MetricClient metricClient; + + public DoFnExecutor( + String stepName, + String description, + JStormPipelineOptions pipelineOptions, + DoFn<InputT, OutputT> doFn, + Coder<WindowedValue<InputT>> inputCoder, + WindowingStrategy<?, ?> windowingStrategy, + TupleTag<InputT> mainInputTag, + Collection<PCollectionView<?>> sideInputs, + Map<TupleTag, PCollectionView<?>> sideInputTagToView, + TupleTag<OutputT> mainTupleTag, + List<TupleTag<?>> sideOutputTags) { + this.stepName = checkNotNull(stepName, "stepName"); + this.description = checkNotNull(description, "description"); + this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); + this.doFn = doFn; + this.inputCoder = inputCoder; + this.outputManager = new DoFnExecutorOutputManager(); + this.windowingStrategy = windowingStrategy; + this.mainInputTag = mainInputTag; + this.sideInputs = sideInputs; + this.mainTupleTag = mainTupleTag; + this.sideOutputTags = sideOutputTags; + this.sideInputTagToView = sideInputTagToView; + } + + protected DoFnRunner<InputT, OutputT> getDoFnRunner() { + return new DoFnRunnerWithMetrics<>( + stepName, + DoFnRunners.simpleRunner( + this.pipelineOptions, + this.doFn, + this.sideInputHandler == null ? NullSideInputReader.empty() : sideInputHandler, + this.outputManager, + this.mainTupleTag, + this.sideOutputTags, + this.stepContext, + this.windowingStrategy), + MetricsReporter.create(metricClient)); + } + + protected void initService(ExecutorContext context) { + // TODO: what should be set for key in here? + timerInternals = new JStormTimerInternals( + null /* key */, this, context.getExecutorsBolt().timerService()); + kvStoreManager = context.getKvStoreManager(); + stepContext = new DefaultStepContext(timerInternals, + new JStormStateInternals( + null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); + metricClient = new MetricClient(executorContext.getTopologyContext()); + } + + @Override + public void init(ExecutorContext context) { + this.executorContext = context; + this.executorsBolt = context.getExecutorsBolt(); + this.pipelineOptions = + this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class); + + initService(context); + + // Side inputs setup + if (sideInputs != null && !sideInputs.isEmpty()) { + pushedBackTag = StateTags.bag("pushed-back-values", inputCoder); + watermarkHoldTag = + StateTags.watermarkStateInternal("hold", TimestampCombiner.EARLIEST); + pushbackStateInternals = new JStormStateInternals( + null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId); + sideInputHandler = new SideInputHandler(sideInputs, pushbackStateInternals); + runner = getDoFnRunner(); + pushbackRunner = + SimplePushbackSideInputDoFnRunner.create(runner, sideInputs, sideInputHandler); + } else { + runner = getDoFnRunner(); + } + + // Process user's setup + doFnInvoker = DoFnInvokers.invokerFor(doFn); + doFnInvoker.invokeSetup(); + } + + @Override + public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) { + LOG.debug(String.format("process: elemTag=%s, mainInputTag=%s, sideInputs=%s, elem={}", + tag, mainInputTag, sideInputs, elem.getValue())); + if (mainInputTag.equals(tag)) { + processMainInput(elem); + } else { + processSideInput(tag, elem); + } + } + + protected <T> void processMainInput(WindowedValue<T> elem) { + if (sideInputs.isEmpty()) { + runner.processElement((WindowedValue<InputT>) elem); + } else { + Iterable<WindowedValue<InputT>> justPushedBack = + pushbackRunner.processElementInReadyWindows((WindowedValue<InputT>) elem); + BagState<WindowedValue<InputT>> pushedBack = + pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); + + Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE; + for (WindowedValue<InputT> pushedBackValue : justPushedBack) { + if (pushedBackValue.getTimestamp().isBefore(min)) { + min = pushedBackValue.getTimestamp(); + } + min = earlier(min, pushedBackValue.getTimestamp()); + pushedBack.add(pushedBackValue); + } + pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag).add(min); + } + } + + protected void processSideInput(TupleTag tag, WindowedValue elem) { + LOG.debug(String.format("side inputs: %s, %s.", tag, elem)); + + PCollectionView<?> sideInputView = sideInputTagToView.get(tag); + sideInputHandler.addSideInputValue(sideInputView, elem); + + BagState<WindowedValue<InputT>> pushedBack = + pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); + + List<WindowedValue<InputT>> newPushedBack = new ArrayList<>(); + + Iterable<WindowedValue<InputT>> pushedBackInputs = pushedBack.read(); + if (pushedBackInputs != null) { + for (WindowedValue<InputT> input : pushedBackInputs) { + + Iterable<WindowedValue<InputT>> justPushedBack = + pushbackRunner.processElementInReadyWindows(input); + Iterables.addAll(newPushedBack, justPushedBack); + } + } + pushedBack.clear(); + + Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE; + for (WindowedValue<InputT> pushedBackValue : newPushedBack) { + min = earlier(min, pushedBackValue.getTimestamp()); + pushedBack.add(pushedBackValue); + } + + WatermarkHoldState watermarkHold = + pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag); + // TODO: clear-then-add is not thread-safe. + watermarkHold.clear(); + watermarkHold.add(min); + } + + /** + * Process all pushed back elements when receiving watermark with max timestamp. + */ + public void processAllPushBackElements() { + if (sideInputs != null && !sideInputs.isEmpty()) { + BagState<WindowedValue<InputT>> pushedBackElements = + pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); + if (pushedBackElements != null) { + for (WindowedValue<InputT> elem : pushedBackElements.read()) { + LOG.info("Process pushback elem={}", elem); + runner.processElement(elem); + } + pushedBackElements.clear(); + } + + WatermarkHoldState watermarkHold = + pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag); + watermarkHold.clear(); + watermarkHold.add(BoundedWindow.TIMESTAMP_MAX_VALUE); + } + } + + public void onTimer(Object key, TimerInternals.TimerData timerData) { + StateNamespace namespace = timerData.getNamespace(); + checkArgument(namespace instanceof StateNamespaces.WindowNamespace); + BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow(); + if (pushbackRunner != null) { + pushbackRunner.onTimer( + timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain()); + } else { + runner.onTimer( + timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain()); + } + } + + @Override + public void cleanup() { + doFnInvoker.invokeTeardown(); + } + + @Override + public String toString() { + return description; + } + + private Instant earlier(Instant left, Instant right) { + return left.isBefore(right) ? left : right; + } + + public void startBundle() { + if (pushbackRunner != null) { + pushbackRunner.startBundle(); + } else { + runner.startBundle(); + } + } + + public void finishBundle() { + if (pushbackRunner != null) { + pushbackRunner.finishBundle(); + } else { + runner.finishBundle(); + } + } + + public void setInternalDoFnExecutorId(int id) { + this.internalDoFnExecutorId = id; + } + + public int getInternalDoFnExecutorId() { + return internalDoFnExecutorId; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnRunnerWithMetrics.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnRunnerWithMetrics.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnRunnerWithMetrics.java new file mode 100644 index 0000000..f614f1c --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnRunnerWithMetrics.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.Closeable; +import java.io.IOException; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.sdk.metrics.MetricsContainer; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.joda.time.Instant; + +/** + * DoFnRunner decorator which registers {@link MetricsContainer}. + */ +class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { + + private final String stepName; + private final DoFnRunner<InputT, OutputT> delegate; + private final MetricsReporter metricsReporter; + + DoFnRunnerWithMetrics( + String stepName, + DoFnRunner<InputT, OutputT> delegate, + MetricsReporter metricsReporter) { + this.stepName = checkNotNull(stepName, "stepName"); + this.delegate = checkNotNull(delegate, "delegate"); + this.metricsReporter = checkNotNull(metricsReporter, "metricsReporter"); + } + + @Override + public void startBundle() { + try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer( + metricsReporter.getMetricsContainer(stepName))) { + delegate.startBundle(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void processElement(WindowedValue<InputT> elem) { + try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer( + metricsReporter.getMetricsContainer(stepName))) { + delegate.processElement(elem); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onTimer( + String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) { + try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer( + metricsReporter.getMetricsContainer(stepName))) { + delegate.onTimer(timerId, window, timestamp, timeDomain); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void finishBundle() { + try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer( + metricsReporter.getMetricsContainer(stepName))) { + delegate.finishBundle(); + } catch (IOException e) { + throw new RuntimeException(e); + } + metricsReporter.updateMetrics(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java new file mode 100644 index 0000000..145b224 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import java.io.Serializable; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; + +/** + * An executor is a basic executable unit in a JStorm task. + */ +interface Executor extends Serializable { + /** + * Initialization during runtime. + */ + void init(ExecutorContext context); + + <T> void process(TupleTag<T> tag, WindowedValue<T> elem); + + void cleanup(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorContext.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorContext.java new file mode 100644 index 0000000..487db35 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorContext.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import backtype.storm.task.TopologyContext; +import com.alibaba.jstorm.cache.IKvStoreManager; +import com.google.auto.value.AutoValue; + +/** + * Context of a executors bolt when runtime. + */ +@AutoValue +abstract class ExecutorContext { + public static ExecutorContext of( + TopologyContext topologyContext, + ExecutorsBolt bolt, + IKvStoreManager kvStoreManager) { + return new AutoValue_ExecutorContext(topologyContext, bolt, kvStoreManager); + } + + public abstract TopologyContext getTopologyContext(); + + public abstract ExecutorsBolt getExecutorsBolt(); + + public abstract IKvStoreManager getKvStoreManager(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java new file mode 100644 index 0000000..ef12db8 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichBatchBolt; +import backtype.storm.tuple.ITupleExt; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import com.alibaba.jstorm.cache.IKvStoreManager; +import com.alibaba.jstorm.cache.KvStoreManagerFactory; +import com.alibaba.jstorm.cluster.Common; +import com.alibaba.jstorm.utils.KryoSerializer; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ExecutorsBolt is a JStorm Bolt composited with several executors chained in a sub-DAG. + */ +public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { + private static final long serialVersionUID = -7751043327801735211L; + + private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class); + + protected ExecutorContext executorContext; + + protected TimerService timerService; + + // map from input tag to executor inside bolt + protected final Map<TupleTag, Executor> inputTagToExecutor = Maps.newHashMap(); + // set of all output tags that will be emit outside bolt + protected final Set<TupleTag> outputTags = Sets.newHashSet(); + protected final Set<TupleTag> externalOutputTags = Sets.newHashSet(); + protected final Set<DoFnExecutor> doFnExecutors = Sets.newHashSet(); + protected int internalDoFnExecutorId = 1; + protected final Map<Integer, DoFnExecutor> idToDoFnExecutor = Maps.newHashMap(); + + protected OutputCollector collector; + + protected boolean isStatefulBolt = false; + + protected KryoSerializer<WindowedValue> serializer; + + public ExecutorsBolt() { + + } + + public void setStatefulBolt(boolean isStateful) { + isStatefulBolt = isStateful; + } + + public void addExecutor(TupleTag inputTag, Executor executor) { + inputTagToExecutor.put( + checkNotNull(inputTag, "inputTag"), + checkNotNull(executor, "executor")); + } + + public Map<TupleTag, Executor> getExecutors() { + return inputTagToExecutor; + } + + public void registerExecutor(Executor executor) { + if (executor instanceof DoFnExecutor) { + DoFnExecutor doFnExecutor = (DoFnExecutor) executor; + idToDoFnExecutor.put(internalDoFnExecutorId, doFnExecutor); + doFnExecutor.setInternalDoFnExecutorId(internalDoFnExecutorId); + internalDoFnExecutorId++; + } + } + + public Map<Integer, DoFnExecutor> getIdToDoFnExecutor() { + return idToDoFnExecutor; + } + + public void addOutputTags(TupleTag tag) { + outputTags.add(tag); + } + + public void addExternalOutputTag(TupleTag<?> tag) { + externalOutputTags.add(tag); + } + + public Set<TupleTag> getOutputTags() { + return outputTags; + } + + public ExecutorContext getExecutorContext() { + return executorContext; + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + LOG.info("Start to prepare for task-{}", context.getThisTaskId()); + try { + this.collector = collector; + + // init kv store manager + String storeName = String.format("task-%d", context.getThisTaskId()); + String stateStorePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName); + IKvStoreManager kvStoreManager = isStatefulBolt + ? KvStoreManagerFactory.getKvStoreManagerWithMonitor( + context, storeName, stateStorePath, isStatefulBolt) + : KvStoreManagerFactory.getKvStoreManager( + stormConf, storeName, stateStorePath, isStatefulBolt); + this.executorContext = ExecutorContext.of(context, this, kvStoreManager); + + // init time service + timerService = initTimerService(); + + // init all internal executors + for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) { + executor.init(executorContext); + if (executor instanceof DoFnExecutor) { + doFnExecutors.add((DoFnExecutor) executor); + } + } + + this.serializer = new KryoSerializer<WindowedValue>(stormConf); + + LOG.info("ExecutorsBolt finished init. LocalExecutors={}", inputTagToExecutor.values()); + LOG.info("inputTagToExecutor={}", inputTagToExecutor); + LOG.info("outputTags={}", outputTags); + LOG.info("externalOutputTags={}", externalOutputTags); + LOG.info("doFnExecutors={}", doFnExecutors); + } catch (IOException e) { + throw new RuntimeException("Failed to prepare executors bolt", e); + } + } + + public TimerService initTimerService() { + TopologyContext context = executorContext.getTopologyContext(); + List<Integer> tasks = FluentIterable.from(context.getThisSourceComponentTasks().entrySet()) + .transformAndConcat( + new Function<Map.Entry<String, List<Integer>>, Iterable<Integer>>() { + @Override + public Iterable<Integer> apply(Map.Entry<String, List<Integer>> value) { + if (Common.isSystemComponent(value.getKey())) { + return Collections.EMPTY_LIST; + } else { + return value.getValue(); + } + } + }) + .toList(); + TimerService ret = new TimerServiceImpl(executorContext); + ret.init(tasks); + return ret; + } + + @Override + public void execute(Tuple input) { + // process a batch + String streamId = input.getSourceStreamId(); + ITupleExt tuple = (ITupleExt) input; + Iterator<List<Object>> valueIterator = tuple.batchValues().iterator(); + if (CommonInstance.BEAM_WATERMARK_STREAM_ID.equals(streamId)) { + while (valueIterator.hasNext()) { + processWatermark((Long) valueIterator.next().get(0), input.getSourceTask()); + } + } else { + doFnStartBundle(); + while (valueIterator.hasNext()) { + processElement(valueIterator.next(), streamId); + } + doFnFinishBundle(); + } + } + + private void processWatermark(long watermarkTs, int sourceTask) { + long newWaterMark = timerService.updateInputWatermark(sourceTask, watermarkTs); + LOG.debug("Recv waterMark-{} from task-{}, newWaterMark={}", + (new Instant(watermarkTs)).toDateTime(), + sourceTask, + (new Instant(newWaterMark)).toDateTime()); + if (newWaterMark != 0) { + // Some buffer windows are going to be triggered. + doFnStartBundle(); + timerService.fireTimers(newWaterMark); + + // SideInput: If receiving water mark with max timestamp, It means no more data is supposed + // to be received from now on. So we are going to process all push back data. + if (newWaterMark == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + for (DoFnExecutor doFnExecutor : doFnExecutors) { + doFnExecutor.processAllPushBackElements(); + } + } + + doFnFinishBundle(); + } + + long currentWaterMark = timerService.currentOutputWatermark(); + if (!externalOutputTags.isEmpty()) { + collector.flush(); + collector.emit( + CommonInstance.BEAM_WATERMARK_STREAM_ID, + new Values(currentWaterMark)); + LOG.debug("Send waterMark-{}", (new Instant(currentWaterMark)).toDateTime()); + } + } + + private void processElement(List<Object> values, String streamId) { + TupleTag inputTag = new TupleTag(streamId); + WindowedValue windowedValue = retrieveWindowedValueFromTupleValue(values); + processExecutorElem(inputTag, windowedValue); + } + + public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T> elem) { + LOG.debug("ProcessExecutorElem: inputTag={}, value={}", inputTag, elem.getValue()); + if (elem != null) { + Executor executor = inputTagToExecutor.get(inputTag); + if (executor != null) { + executor.process(inputTag, elem); + } + if (externalOutputTags.contains(inputTag)) { + emitOutsideBolt(inputTag, elem); + } + } else { + LOG.info("Received null elem for tag={}", inputTag); + } + } + + @Override + public void cleanup() { + for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) { + executor.cleanup(); + } + executorContext.getKvStoreManager().close(); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + + public TimerService timerService() { + return timerService; + } + + public void setTimerService(TimerService service) { + timerService = service; + } + + private WindowedValue retrieveWindowedValueFromTupleValue(List<Object> values) { + WindowedValue wv = null; + if (values.size() > 1) { + Object key = values.get(0); + WindowedValue value = serializer.deserialize((byte[]) values.get(1)); + wv = value.withValue(KV.of(key, value.getValue())); + } else { + wv = serializer.deserialize((byte[]) values.get(0)); + } + return wv; + } + + protected void emitOutsideBolt(TupleTag outputTag, WindowedValue outputValue) { + LOG.debug("Output outside: tag={}, value={}", outputTag, outputValue.getValue()); + if (keyedEmit(outputTag.getId())) { + KV kv = (KV) outputValue.getValue(); + byte[] immutableOutputValue = serializer.serialize(outputValue.withValue(kv.getValue())); + // Convert WindowedValue<KV> to <K, WindowedValue<V>> + if (kv.getKey() == null) { + // If key is null, emit "null" string here. Because, null value will be ignored in JStorm. + collector.emit(outputTag.getId(), new Values("null", immutableOutputValue)); + } else { + collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableOutputValue)); + } + } else { + byte[] immutableOutputValue = serializer.serialize(outputValue); + collector.emit(outputTag.getId(), new Values(immutableOutputValue)); + } + } + + private void doFnStartBundle() { + for (DoFnExecutor doFnExecutor : doFnExecutors) { + doFnExecutor.startBundle(); + } + } + + private void doFnFinishBundle() { + for (DoFnExecutor doFnExecutor : doFnExecutors) { + doFnExecutor.finishBundle(); + } + } + + @Override + public String toString() { + // LOG.info("bolt: " + executorContext.getTopologyContext().toJSONString()); + List<String> ret = new ArrayList<>(); + /*ret.add("inputTags"); + for (TupleTag inputTag : inputTagToExecutor.keySet()) { + ret.add(inputTag.getId()); + }*/ + ret.add("internalExecutors"); + for (Executor executor : inputTagToExecutor.values()) { + ret.add(executor.toString()); + } + ret.add("externalOutputTags"); + for (TupleTag output : externalOutputTags) { + ret.add(output.getId()); + } + return Joiner.on('\n').join(ret).concat("\n"); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java new file mode 100644 index 0000000..a64f494 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; + +/** + * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.Flatten}. + * @param <InputT> + */ +class FlattenExecutor<InputT> implements Executor { + + private final String description; + private TupleTag mainOutputTag; + private ExecutorContext context; + private ExecutorsBolt executorsBolt; + + public FlattenExecutor(String description, TupleTag mainTupleTag) { + this.description = checkNotNull(description, "description"); + this.mainOutputTag = mainTupleTag; + } + + @Override + public void init(ExecutorContext context) { + this.context = context; + this.executorsBolt = context.getExecutorsBolt(); + } + + @Override + public void process(TupleTag tag, WindowedValue elem) { + executorsBolt.processExecutorElem(mainOutputTag, elem); + } + + @Override + public void cleanup() { + } + + @Override + public String toString() { + return description; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java new file mode 100644 index 0000000..89708df --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import com.google.common.collect.Maps; +import java.util.Map; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Translates a {@link Flatten} to a JStorm {@link FlattenExecutor}. + * @param <V> + */ +class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollections<V>> { + + @Override + public void translateNode(Flatten.PCollections<V> transform, TranslationContext context) { + TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + + // Since a new tag is created in PCollectionList, retrieve the real tag here. + Map<TupleTag<?>, PValue> inputs = Maps.newHashMap(); + for (Map.Entry<TupleTag<?>, PValue> entry : userGraphContext.getInputs().entrySet()) { + PCollection<V> pc = (PCollection<V>) entry.getValue(); + inputs.putAll(pc.expand()); + } + System.out.println("Real inputs: " + inputs); + System.out.println("FlattenList inputs: " + userGraphContext.getInputs()); + String description = describeTransform(transform, inputs, userGraphContext.getOutputs()); + FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag()); + context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java new file mode 100644 index 0000000..85c958a --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import com.google.common.collect.Lists; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; + +/** + * Translates a {@link GroupByKey} to a JStorm {@link GroupByWindowExecutor}. + * @param <K> + * @param <V> + */ +class GroupByKeyTranslator<K, V> extends TransformTranslator.Default<GroupByKey<K, V>> { + // information of transform + protected PCollection<KV<K, V>> input; + protected PCollection<KV<K, Iterable<V>>> output; + protected List<TupleTag<?>> inputTags; + protected TupleTag<KV<K, Iterable<V>>> mainOutputTag; + protected List<TupleTag<?>> sideOutputTags; + protected List<PCollectionView<?>> sideInputs; + protected WindowingStrategy<?, ?> windowingStrategy; + + @Override + public void translateNode(GroupByKey<K, V> transform, TranslationContext context) { + TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + String description = + describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); + + input = (PCollection<KV<K, V>>) userGraphContext.getInput(); + output = (PCollection<KV<K, Iterable<V>>>) userGraphContext.getOutput(); + + inputTags = userGraphContext.getInputTags(); + mainOutputTag = (TupleTag<KV<K, Iterable<V>>>) userGraphContext.getOutputTag(); + sideOutputTags = Lists.newArrayList(); + + sideInputs = Collections.<PCollectionView<?>>emptyList(); + windowingStrategy = input.getWindowingStrategy(); + + GroupByWindowExecutor<K, V> groupByWindowExecutor = new GroupByWindowExecutor<>( + userGraphContext.getStepName(), + description, + context, + context.getUserGraphContext().getOptions(), + windowingStrategy, + mainOutputTag, + sideOutputTags); + context.addTransformExecutor(groupByWindowExecutor); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java new file mode 100644 index 0000000..bf6e1ad --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.ImmutableList; +import java.io.Serializable; +import java.util.List; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.KeyedWorkItems; +import org.apache.beam.runners.core.NullSideInputReader; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateInternalsFactory; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.TimerInternalsFactory; +import org.apache.beam.runners.jstorm.JStormPipelineOptions; +import org.apache.beam.runners.jstorm.translation.TranslationContext.UserGraphContext; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.GroupByKey}. + * @param <K> + * @param <V> + */ +class GroupByWindowExecutor<K, V> + extends DoFnExecutor<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> { + private static final long serialVersionUID = -7563050475488610553L; + + private static final Logger LOG = LoggerFactory.getLogger(GroupByWindowExecutor.class); + + private class GroupByWindowOutputManager implements DoFnRunners.OutputManager, Serializable { + + @Override + public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { + executorsBolt.processExecutorElem(tag, output); + } + } + + private KvCoder<K, V> inputKvCoder; + private SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn; + + public GroupByWindowExecutor( + String stepName, + String description, + TranslationContext context, + JStormPipelineOptions pipelineOptions, + WindowingStrategy<?, ?> windowingStrategy, + TupleTag<KV<K, Iterable<V>>> mainTupleTag, List<TupleTag<?>> sideOutputTags) { + // The doFn will be created when runtime. Just pass "null" here + super( + stepName, + description, + pipelineOptions, + null, + null, + windowingStrategy, + null, + null, + null, + mainTupleTag, + sideOutputTags); + + this.outputManager = new GroupByWindowOutputManager(); + UserGraphContext userGraphContext = context.getUserGraphContext(); + PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) userGraphContext.getInput(); + this.inputKvCoder = (KvCoder<K, V>) input.getCoder(); + } + + private DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getGroupByWindowDoFn() { + final StateInternalsFactory<K> stateFactory = new StateInternalsFactory<K>() { + @Override + public StateInternals stateInternalsForKey(K key) { + return new JStormStateInternals<K>( + key, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId); + } + }; + TimerInternalsFactory<K> timerFactory = new TimerInternalsFactory<K>() { + @Override + public TimerInternals timerInternalsForKey(K key) { + return new JStormTimerInternals<>( + key, + GroupByWindowExecutor.this, + executorContext.getExecutorsBolt().timerService()); + } + }; + + reduceFn = SystemReduceFn.buffering(inputKvCoder.getValueCoder()); + DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFn = + GroupAlsoByWindowViaWindowSetNewDoFn.create( + windowingStrategy, stateFactory, timerFactory, NullSideInputReader.empty(), + (SystemReduceFn) reduceFn, outputManager, mainTupleTag); + return doFn; + } + + @Override + protected DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getDoFnRunner() { + doFn = getGroupByWindowDoFn(); + + DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> simpleRunner = DoFnRunners.simpleRunner( + this.pipelineOptions, + this.doFn, + NullSideInputReader.empty(), + this.outputManager, + this.mainTupleTag, + this.sideOutputTags, + this.stepContext, + this.windowingStrategy); + + DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFnRunner = + DoFnRunners.lateDataDroppingRunner( + simpleRunner, + this.stepContext, + this.windowingStrategy); + return new DoFnRunnerWithMetrics<>( + stepName, doFnRunner, MetricsReporter.create(metricClient)); + } + + @Override + public void process(TupleTag tag, WindowedValue elem) { + /** + * For GroupByKey, KV type elem is received. We need to convert the KV elem + * into KeyedWorkItem first, which is the expected type in LateDataDroppingDoFnRunner. + */ + KeyedWorkItem<K, V> keyedWorkItem = RunnerUtils.toKeyedWorkItem((WindowedValue<KV<K, V>>) elem); + runner.processElement(elem.withValue(keyedWorkItem)); + } + + @Override + public void onTimer(Object key, TimerInternals.TimerData timerData) { + StateNamespace namespace = timerData.getNamespace(); + checkArgument(namespace instanceof StateNamespaces.WindowNamespace); + + runner.processElement( + WindowedValue.valueInGlobalWindow( + KeyedWorkItems.<K, V>timersWorkItem((K) key, ImmutableList.of(timerData)))); + } + + @Override + public String toString() { + return super.toString(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormBagState.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormBagState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormBagState.java new file mode 100644 index 0000000..3e5d52b --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormBagState.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.alibaba.jstorm.cache.ComposedKey; +import com.alibaba.jstorm.cache.IKvStore; +import com.alibaba.jstorm.cache.KvStoreIterable; +import java.io.IOException; +import java.util.Iterator; +import java.util.NoSuchElementException; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.ReadableState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of {@link BagState} in JStorm runner. + */ +class JStormBagState<K, T> implements BagState<T> { + private static final Logger LOG = LoggerFactory.getLogger(JStormBagState.class); + + @Nullable + private final K key; + private final StateNamespace namespace; + private final IKvStore<ComposedKey, T> kvState; + private final IKvStore<ComposedKey, Object> stateInfoKvState; + private int elemIndex; + + public JStormBagState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState, + IKvStore<ComposedKey, Object> stateInfoKvState) throws IOException { + this.key = key; + this.namespace = checkNotNull(namespace, "namespace"); + this.kvState = checkNotNull(kvState, "kvState"); + this.stateInfoKvState = checkNotNull(stateInfoKvState, "stateInfoKvState"); + + Integer index = (Integer) stateInfoKvState.get(getComposedKey()); + this.elemIndex = index != null ? ++index : 0; + } + + @Override + public void add(T input) { + try { + kvState.put(getComposedKey(elemIndex), input); + stateInfoKvState.put(getComposedKey(), elemIndex); + elemIndex++; + } catch (IOException e) { + throw new RuntimeException(e.getCause()); + } + } + + @Override + public ReadableState<Boolean> isEmpty() { + return new ReadableState<Boolean>() { + @Override + public Boolean read() { + return elemIndex <= 0; + } + + @Override + public ReadableState<Boolean> readLater() { + // TODO: support prefetch. + return this; + } + }; + } + + @Override + public Iterable<T> read() { + return new BagStateIterable(elemIndex); + } + + @Override + public BagState readLater() { + // TODO: support prefetch. + return this; + } + + @Override + public void clear() { + try { + for (int i = 0; i < elemIndex; i++) { + kvState.remove(getComposedKey(i)); + } + stateInfoKvState.remove(getComposedKey()); + elemIndex = 0; + } catch (IOException e) { + throw new RuntimeException(e.getCause()); + } + } + + private ComposedKey getComposedKey() { + return ComposedKey.of(key, namespace); + } + + private ComposedKey getComposedKey(int elemIndex) { + return ComposedKey.of(key, namespace, elemIndex); + } + + /** + * Implementation of Bag state Iterable. + */ + private class BagStateIterable implements KvStoreIterable<T> { + + private class BagStateIterator implements Iterator<T> { + private final int size; + private int cursor = 0; + + BagStateIterator() { + Integer s = null; + try { + s = (Integer) stateInfoKvState.get(getComposedKey()); + } catch (IOException e) { + LOG.error("Failed to get elemIndex for key={}", getComposedKey()); + } + this.size = s != null ? ++s : 0; + } + + @Override + public boolean hasNext() { + return cursor < size; + } + + @Override + public T next() { + if (cursor >= size) { + throw new NoSuchElementException(); + } + + T value = null; + try { + value = kvState.get(getComposedKey(cursor)); + } catch (IOException e) { + LOG.error("Failed to read composed key-[{}]", getComposedKey(cursor)); + } + cursor++; + return value; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + private final int size; + + BagStateIterable(int size) { + this.size = size; + } + + @Override + public Iterator<T> iterator() { + return new BagStateIterator(); + } + + @Override + public String toString() { + return String.format("BagStateIterable: composedKey=%s", getComposedKey()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormCombiningState.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormCombiningState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormCombiningState.java new file mode 100644 index 0000000..6bd021f --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormCombiningState.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import javax.annotation.Nullable; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.CombiningState; +import org.apache.beam.sdk.state.ReadableState; +import org.apache.beam.sdk.transforms.Combine; + +/** + * JStorm implementation of {@link CombiningState}. + */ +class JStormCombiningState<InputT, AccumT, OutputT> + implements CombiningState<InputT, AccumT, OutputT> { + + @Nullable + private final BagState<AccumT> accumBagState; + private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn; + + JStormCombiningState( + BagState<AccumT> accumBagState, + Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { + this.accumBagState = checkNotNull(accumBagState, "accumBagState"); + this.combineFn = checkNotNull(combineFn, "combineFn"); + } + + @Override + public AccumT getAccum() { + // TODO: replacing the accumBagState with the merged accum. + return combineFn.mergeAccumulators(accumBagState.read()); + } + + @Override + public void addAccum(AccumT accumT) { + accumBagState.add(accumT); + } + + @Override + public AccumT mergeAccumulators(Iterable<AccumT> iterable) { + return combineFn.mergeAccumulators(iterable); + } + + @Override + public void add(InputT input) { + accumBagState.add( + combineFn.addInput(combineFn.createAccumulator(), input)); + } + + @Override + public ReadableState<Boolean> isEmpty() { + return accumBagState.isEmpty(); + } + + @Override + public OutputT read() { + return combineFn.extractOutput( + combineFn.mergeAccumulators(accumBagState.read())); + } + + @Override + public CombiningState<InputT, AccumT, OutputT> readLater() { + // TODO: support prefetch. + return this; + } + + @Override + public void clear() { + accumBagState.clear(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMapState.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMapState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMapState.java new file mode 100644 index 0000000..6a4e376 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMapState.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import com.alibaba.jstorm.cache.IKvStore; +import java.io.IOException; +import java.util.Map; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.sdk.state.MapState; +import org.apache.beam.sdk.state.ReadableState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of {@link MapState} in JStorm runner. + * @param <K> + * @param <V> + */ +class JStormMapState<K, V> implements MapState<K, V> { + private static final Logger LOG = LoggerFactory.getLogger(JStormMapState.class); + + private final K key; + private final StateNamespace namespace; + private IKvStore<K, V> kvStore; + + public JStormMapState(K key, StateNamespace namespace, IKvStore<K, V> kvStore) { + this.key = key; + this.namespace = namespace; + this.kvStore = kvStore; + } + + @Override + public void put(K var1, V var2) { + try { + kvStore.put(var1, var2); + } catch (IOException e) { + reportError(String.format("Failed to put key=%s, value=%s", var1, var2), e); + } + } + + @Override + public ReadableState<V> putIfAbsent(K var1, V var2) { + ReadableState<V> ret = null; + try { + V value = kvStore.get(var1); + if (value == null) { + kvStore.put(var1, var2); + ret = new MapReadableState<>(null); + } else { + ret = new MapReadableState<>(value); + } + } catch (IOException e) { + reportError(String.format("Failed to putIfAbsent key=%s, value=%s", var1, var2), e); + } + return ret; + } + + @Override + public void remove(K var1) { + try { + kvStore.remove(var1); + } catch (IOException e) { + reportError(String.format("Failed to remove key=%s", var1), e); + } + } + + @Override + public ReadableState<V> get(K var1) { + ReadableState<V> ret = new MapReadableState<>(null); + try { + ret = new MapReadableState(kvStore.get(var1)); + } catch (IOException e) { + reportError(String.format("Failed to get value for key=%s", var1), e); + } + return ret; + } + + @Override + public ReadableState<Iterable<K>> keys() { + ReadableState<Iterable<K>> ret = new MapReadableState<>(null); + try { + ret = new MapReadableState<>(kvStore.keys()); + } catch (IOException e) { + reportError(String.format("Failed to get keys"), e); + } + return ret; + } + + @Override + public ReadableState<Iterable<V>> values() { + ReadableState<Iterable<V>> ret = new MapReadableState<>(null); + try { + ret = new MapReadableState<>(kvStore.values()); + } catch (IOException e) { + reportError(String.format("Failed to get values"), e); + } + return ret; + } + + @Override + public ReadableState<Iterable<Map.Entry<K, V>>> entries() { + ReadableState<Iterable<Map.Entry<K, V>>> ret = new MapReadableState<>(null); + try { + ret = new MapReadableState<>(kvStore.entries()); + } catch (IOException e) { + reportError(String.format("Failed to get values"), e); + } + return ret; + } + + @Override + public void clear() { + try { + Iterable<K> keys = kvStore.keys(); + kvStore.removeBatch(keys); + } catch (IOException e) { + reportError(String.format("Failed to clear map state"), e); + } + } + + private void reportError(String errorInfo, IOException e) { + LOG.error(errorInfo, e); + throw new RuntimeException(errorInfo); + } + + private class MapReadableState<T> implements ReadableState<T> { + private T value; + + public MapReadableState(T value) { + this.value = value; + } + + @Override + public T read() { + return value; + } + + @Override + public ReadableState<T> readLater() { + return this; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java index 1449a43..298ad32 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java @@ -22,8 +22,6 @@ import com.google.common.collect.Iterables; import java.util.List; import org.apache.beam.runners.core.construction.PTransformMatchers; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; -import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator; -import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformOverride;
