http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java new file mode 100644 index 0000000..fce870f --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.alibaba.jstorm.cache.ComposedKey; +import com.alibaba.jstorm.cache.IKvStoreManager; +import java.io.IOException; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.CombiningState; +import org.apache.beam.sdk.state.MapState; +import org.apache.beam.sdk.state.SetState; +import org.apache.beam.sdk.state.State; +import org.apache.beam.sdk.state.StateBinder; +import org.apache.beam.sdk.state.StateContext; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.state.WatermarkHoldState; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn; +import org.apache.beam.sdk.transforms.CombineWithContext; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.joda.time.Instant; + +/** + * JStorm implementation of {@link StateInternals}. + */ +class JStormStateInternals<K> implements StateInternals { + + private static final String STATE_INFO = "state-info:"; + + @Nullable + private final K key; + private final IKvStoreManager kvStoreManager; + private final TimerService timerService; + private final int executorId; + + public JStormStateInternals(K key, IKvStoreManager kvStoreManager, + TimerService timerService, int executorId) { + this.key = key; + this.kvStoreManager = checkNotNull(kvStoreManager, "kvStoreManager"); + this.timerService = checkNotNull(timerService, "timerService"); + this.executorId = executorId; + } + + @Nullable + @Override + public K getKey() { + return key; + } + + @Override + public <T extends State> T state( + StateNamespace namespace, StateTag<T> address, StateContext<?> c) { + // throw new UnsupportedOperationException("StateContext is not supported."); + /** + * TODOï¼ + * Same implementation as state() which is without StateContext. This might be updated after + * we figure out if we really need StateContext for JStorm state internals. + */ + return state(namespace, address); + } + + @Override + public <T extends State> T state(final StateNamespace namespace, StateTag<T> address) { + return address.getSpec().bind(address.getId(), new StateBinder() { + @Override + public <T> ValueState<T> bindValue(String id, StateSpec<ValueState<T>> spec, Coder<T> coder) { + try { + return new JStormValueState<>( + getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id))); + } catch (IOException e) { + throw new RuntimeException(); + } + } + + @Override + public <T> BagState<T> bindBag(String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) { + try { + return new JStormBagState( + getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)), + kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id))); + } catch (IOException e) { + throw new RuntimeException(); + } + } + + @Override + public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) { + throw new UnsupportedOperationException(); + } + + @Override + public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( + String id, + StateSpec<MapState<KeyT, ValueT>> spec, + Coder<KeyT> mapKeyCoder, + Coder<ValueT> mapValueCoder) { + try { + return new JStormMapState<>( + getKey(), namespace, kvStoreManager.<KeyT, ValueT>getOrCreate(getStoreId(id))); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public <InputT, AccumT, OutputT> CombiningState bindCombining( + String id, + StateSpec<CombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { + try { + BagState<AccumT> accumBagState = new JStormBagState( + getKey(), namespace, + kvStoreManager.<ComposedKey, AccumT>getOrCreate(getStoreId(id)), + kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id))); + return new JStormCombiningState<>(accumBagState, combineFn); + } catch (IOException e) { + throw new RuntimeException(); + } + } + + + @Override + public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> + bindCombiningWithContext( + String id, + StateSpec<CombiningState<InputT, AccumT, OutputT>> stateSpec, Coder<AccumT> coder, + CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) { + throw new UnsupportedOperationException(); + } + + @Override + public WatermarkHoldState bindWatermark( + String id, + StateSpec<WatermarkHoldState> spec, + final TimestampCombiner timestampCombiner) { + try { + BagState<Combine.Holder<Instant>> accumBagState = new JStormBagState( + getKey(), namespace, + kvStoreManager.<ComposedKey, Combine.Holder<Instant>>getOrCreate(getStoreId(id)), + kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id))); + + Combine.CombineFn<Instant, Combine.Holder<Instant>, Instant> outputTimeCombineFn = + new BinaryCombineFn<Instant>() { + @Override + public Instant apply(Instant left, Instant right) { + return timestampCombiner.combine(left, right); + } + }; + return new JStormWatermarkHoldState( + namespace, + new JStormCombiningState<>( + accumBagState, + outputTimeCombineFn), + timestampCombiner, + timerService); + } catch (IOException e) { + throw new RuntimeException(); + } + } + }); + } + + private String getStoreId(String stateId) { + return String.format("%s-%s", stateId, executorId); + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java new file mode 100644 index 0000000..4c96541 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import javax.annotation.Nullable; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.sdk.state.TimeDomain; +import org.joda.time.Instant; + +/** + * JStorm implementation of {@link TimerInternals}. + */ +class JStormTimerInternals<K> implements TimerInternals { + + private final K key; + private final DoFnExecutor<?, ?> doFnExecutor; + private final TimerService timerService; + + public JStormTimerInternals( + @Nullable K key, DoFnExecutor<?, ?> doFnExecutor, TimerService timerService) { + this.key = key; + this.doFnExecutor = checkNotNull(doFnExecutor, "doFnExecutor"); + this.timerService = checkNotNull(timerService, "timerService"); + } + + @Override + public void setTimer( + StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) { + setTimer(TimerData.of(timerId, namespace, target, timeDomain)); + } + + @Override + @Deprecated + public void setTimer(TimerData timerData) { + timerService.setTimer(key, timerData, doFnExecutor); + } + + @Override + public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { + throw new UnsupportedOperationException( + "Canceling of a timer is not yet supported."); + } + + @Override + @Deprecated + public void deleteTimer(StateNamespace namespace, String timerId) { + throw new UnsupportedOperationException( + "Canceling of a timer is not yet supported."); + } + + @Override + @Deprecated + public void deleteTimer(TimerData timerData) { + throw new UnsupportedOperationException( + "Canceling of a timer is not yet supported."); + } + + @Override + public Instant currentProcessingTime() { + return Instant.now(); + } + + @Override + @Nullable + public Instant currentSynchronizedProcessingTime() { + return null; + } + + @Override + public Instant currentInputWatermarkTime() { + return new Instant(timerService.currentInputWatermark()); + } + + @Override + @Nullable + public Instant currentOutputWatermarkTime() { + return new Instant(timerService.currentOutputWatermark()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormValueState.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormValueState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormValueState.java new file mode 100644 index 0000000..5d79d21 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormValueState.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import com.alibaba.jstorm.cache.ComposedKey; +import com.alibaba.jstorm.cache.IKvStore; +import java.io.IOException; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.sdk.state.ValueState; + +/** + * JStorm implementation of {@link ValueState}. + */ +class JStormValueState<K, T> implements ValueState<T> { + + @Nullable + private final K key; + private final StateNamespace namespace; + private final IKvStore<ComposedKey, T> kvState; + + JStormValueState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState) { + this.key = key; + this.namespace = namespace; + this.kvState = kvState; + } + + @Override + public void write(T t) { + try { + kvState.put(getComposedKey(), t); + } catch (IOException e) { + throw new RuntimeException(String.format( + "Failed to write key: %s, namespace: %s, value: %s.", key, namespace, t)); + } + } + + @Override + public T read() { + try { + return kvState.get(getComposedKey()); + } catch (IOException e) { + throw new RuntimeException(String.format( + "Failed to read key: %s, namespace: %s.", key, namespace)); + } + } + + @Override + public ValueState<T> readLater() { + // TODO: support prefetch. + return this; + } + + @Override + public void clear() { + try { + kvState.remove(getComposedKey()); + } catch (IOException e) { + throw new RuntimeException(String.format( + "Failed to clear key: %s, namespace: %s.", key, namespace)); + } + } + + private ComposedKey getComposedKey() { + return ComposedKey.of(key, namespace); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormWatermarkHoldState.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormWatermarkHoldState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormWatermarkHoldState.java new file mode 100644 index 0000000..7e1c28f --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormWatermarkHoldState.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.sdk.state.GroupingState; +import org.apache.beam.sdk.state.ReadableState; +import org.apache.beam.sdk.state.WatermarkHoldState; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.joda.time.Instant; + +/** + * JStorm implementation of {@link WatermarkHoldState}. + */ +class JStormWatermarkHoldState implements WatermarkHoldState { + + private final StateNamespace namespace; + private final GroupingState<Instant, Instant> watermarkHoldsState; + private final TimestampCombiner timestampCombiner; + private final TimerService timerService; + + JStormWatermarkHoldState( + StateNamespace namespace, + GroupingState<Instant, Instant> watermarkHoldsState, + TimestampCombiner timestampCombiner, + TimerService timerService) { + this.namespace = checkNotNull(namespace, "namespace"); + this.watermarkHoldsState = checkNotNull(watermarkHoldsState, "watermarkHoldsState"); + this.timestampCombiner = checkNotNull(timestampCombiner, "timestampCombiner"); + this.timerService = checkNotNull(timerService, "timerService"); + } + + @Override + public TimestampCombiner getTimestampCombiner() { + return timestampCombiner; + } + + @Override + public void add(Instant instant) { + timerService.addWatermarkHold(namespace.stringKey(), instant); + watermarkHoldsState.add(instant); + } + + @Override + public ReadableState<Boolean> isEmpty() { + return watermarkHoldsState.isEmpty(); + } + + @Override + public Instant read() { + return watermarkHoldsState.read(); + } + + @Override + public WatermarkHoldState readLater() { + // TODO: support prefetch. + return this; + } + + @Override + public void clear() { + timerService.clearWatermarkHold(namespace.stringKey()); + watermarkHoldsState.clear(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java new file mode 100644 index 0000000..82d8bdc --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; + +import com.alibaba.jstorm.common.metric.AsmCounter; +import com.alibaba.jstorm.metric.MetricClient; +import com.google.common.collect.Maps; +import java.util.Map; +import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.metrics.MetricsContainer; +import org.apache.beam.sdk.metrics.MetricsFilter; + +/** + * Class that holds a {@link MetricsContainerStepMap}, and reports metrics to JStorm engine. + */ +class MetricsReporter { + + private static final String METRIC_KEY_SEPARATOR = "__"; + private static final String COUNTER_PREFIX = "__counter"; + + private final MetricsContainerStepMap metricsContainers = new MetricsContainerStepMap(); + private final Map<String, Long> reportedCounters = Maps.newHashMap(); + private final MetricClient metricClient; + + public static MetricsReporter create(MetricClient metricClient) { + return new MetricsReporter(metricClient); + } + + private MetricsReporter(MetricClient metricClient) { + this.metricClient = checkNotNull(metricClient, "metricClient"); + } + + public MetricsContainer getMetricsContainer(String stepName) { + return metricsContainers.getContainer(stepName); + } + + public void updateMetrics() { + MetricResults metricResults = asAttemptedOnlyMetricResults(metricsContainers); + MetricQueryResults metricQueryResults = + metricResults.queryMetrics(MetricsFilter.builder().build()); + updateCounters(metricQueryResults.counters()); + } + + private void updateCounters(Iterable<MetricResult<Long>> counters) { + System.out.print("updateCounters"); + for (MetricResult<Long> metricResult : counters) { + String metricName = getMetricNameString(COUNTER_PREFIX, metricResult); + System.out.print("metricName: " + metricName); + Long updateValue = metricResult.attempted(); + Long oldValue = reportedCounters.get(metricName); + + if (oldValue == null || oldValue < updateValue) { + AsmCounter counter = metricClient.registerCounter(metricName); + Long incValue = (oldValue == null ? updateValue : updateValue - oldValue); + counter.update(incValue); + } + } + } + + private String getMetricNameString(String prefix, MetricResult<?> metricResult) { + return prefix + + METRIC_KEY_SEPARATOR + metricResult.step() + + METRIC_KEY_SEPARATOR + metricResult.name().namespace() + + METRIC_KEY_SEPARATOR + metricResult.name().name(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java new file mode 100644 index 0000000..49b0f85 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.jstorm.JStormPipelineOptions; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JStorm {@link Executor} for {@link DoFn} with multi-output. + * @param <InputT> + * @param <OutputT> + */ +class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, OutputT> { + private static final Logger LOG = LoggerFactory.getLogger(MultiOutputDoFnExecutor.class); + + /** + * For multi-output scenario,a "local" tuple tag is used in producer currently while a generated + * tag is used in downstream consumer. So before output, we need to map this "local" tag to + * "external" tag. See PCollectionTuple for details. + */ + public class MultiOutputDoFnExecutorOutputManager extends DoFnExecutorOutputManager { + @Override + public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { + if (localTupleTagMap.containsKey(tag)) { + executorsBolt.processExecutorElem((TupleTag<T>) localTupleTagMap.get(tag), output); + } else { + executorsBolt.processExecutorElem(tag, output); + } + } + } + + protected Map<TupleTag<?>, TupleTag<?>> localTupleTagMap; + + public MultiOutputDoFnExecutor( + String stepName, + String description, + JStormPipelineOptions pipelineOptions, + DoFn<InputT, OutputT> doFn, + Coder<WindowedValue<InputT>> inputCoder, + WindowingStrategy<?, ?> windowingStrategy, + TupleTag<InputT> mainInputTag, + Collection<PCollectionView<?>> sideInputs, + Map<TupleTag, PCollectionView<?>> sideInputTagToView, + TupleTag<OutputT> mainTupleTag, + List<TupleTag<?>> sideOutputTags, + Map<TupleTag<?>, TupleTag<?>> localTupleTagMap + ) { + super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag, + sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags); + this.localTupleTagMap = localTupleTagMap; + this.outputManager = new MultiOutputDoFnExecutorOutputManager(); + LOG.info("localTupleTagMap: {}", localTupleTagMap); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java new file mode 100644 index 0000000..a3ffc30 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.jstorm.JStormPipelineOptions; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; + +/** + * JStorm {@link Executor} for stateful {@link DoFn} with multi-output. + * @param <OutputT> + */ +class MultiStatefulDoFnExecutor<OutputT> extends MultiOutputDoFnExecutor<KV, OutputT> { + + public MultiStatefulDoFnExecutor( + String stepName, String description, + JStormPipelineOptions pipelineOptions, DoFn<KV, OutputT> doFn, + Coder<WindowedValue<KV>> inputCoder, WindowingStrategy<?, ?> windowingStrategy, + TupleTag<KV> mainInputTag, Collection<PCollectionView<?>> sideInputs, + Map<TupleTag, PCollectionView<?>> sideInputTagToView, TupleTag<OutputT> mainTupleTag, + List<TupleTag<?>> sideOutputTags, Map<TupleTag<?>, TupleTag<?>> localTupleTagMap) { + super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag, + sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags, localTupleTagMap); + } + + @Override + public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) { + if (mainInputTag.equals(tag)) { + WindowedValue<KV> kvElem = (WindowedValue<KV>) elem; + stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this, + executorContext.getExecutorsBolt().timerService())); + stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(), + kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); + processMainInput(elem); + } else { + processSideInput(tag, elem); + } + } + + @Override + public void onTimer(Object key, TimerInternals.TimerData timerData) { + stepContext.setStateInternals(new JStormStateInternals<>(key, + kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); + super.onTimer(key, timerData); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java new file mode 100644 index 0000000..7daa1cb --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.PValueBase; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Translates a ParDo.BoundMulti to a Storm {@link DoFnExecutor}. + */ +class ParDoBoundMultiTranslator<InputT, OutputT> + extends TransformTranslator.Default<ParDo.MultiOutput<InputT, OutputT>> { + + @Override + public void translateNode( + ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) { + final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + final TupleTag<InputT> inputTag = (TupleTag<InputT>) userGraphContext.getInputTag(); + PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput(); + + Map<TupleTag<?>, PValue> allOutputs = Maps.newHashMap(userGraphContext.getOutputs()); + Map<TupleTag<?>, TupleTag<?>> localToExternalTupleTagMap = Maps.newHashMap(); + for (Map.Entry<TupleTag<?>, PValue> entry : allOutputs.entrySet()) { + Iterator<TupleTag<?>> itr = ((PValueBase) entry.getValue()).expand().keySet().iterator(); + localToExternalTupleTagMap.put(entry.getKey(), itr.next()); + } + + TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag(); + List<TupleTag<?>> sideOutputTags = userGraphContext.getOutputTags(); + sideOutputTags.remove(mainOutputTag); + + Map<TupleTag<?>, PValue> allInputs = Maps.newHashMap(userGraphContext.getInputs()); + for (PCollectionView pCollectionView : transform.getSideInputs()) { + allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); + } + String description = describeTransform( + transform, + allInputs, + allOutputs); + + ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder(); + for (PCollectionView pCollectionView : transform.getSideInputs()) { + sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); + } + + DoFnExecutor executor; + DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); + if (signature.stateDeclarations().size() > 0 + || signature.timerDeclarations().size() > 0) { + executor = new MultiStatefulDoFnExecutor<>( + userGraphContext.getStepName(), + description, + userGraphContext.getOptions(), + (DoFn<KV, OutputT>) transform.getFn(), + (Coder) WindowedValue.getFullCoder( + input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), + input.getWindowingStrategy(), + (TupleTag<KV>) inputTag, + transform.getSideInputs(), + sideInputTagToView.build(), + mainOutputTag, + sideOutputTags, + localToExternalTupleTagMap); + } else { + executor = new MultiOutputDoFnExecutor<>( + userGraphContext.getStepName(), + description, + userGraphContext.getOptions(), + transform.getFn(), + WindowedValue.getFullCoder( + input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), + input.getWindowingStrategy(), + inputTag, + transform.getSideInputs(), + sideInputTagToView.build(), + mainOutputTag, + sideOutputTags, + localToExternalTupleTagMap); + } + + context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs())); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java new file mode 100644 index 0000000..6feb7f8 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Translates a ParDo.Bound to a JStorm {@link DoFnExecutor}. + */ +class ParDoBoundTranslator<InputT, OutputT> + extends TransformTranslator.Default<ParDo.SingleOutput<InputT, OutputT>> { + + private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslator.class); + + @Override + public void translateNode( + ParDo.SingleOutput<InputT, OutputT> transform, TranslationContext context) { + final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + final TupleTag<?> inputTag = userGraphContext.getInputTag(); + PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput(); + + TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag(); + List<TupleTag<?>> sideOutputTags = Lists.newArrayList(); + + Map<TupleTag<?>, PValue> allInputs = + avro.shaded.com.google.common.collect.Maps.newHashMap(userGraphContext.getInputs()); + for (PCollectionView pCollectionView : transform.getSideInputs()) { + allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); + } + String description = describeTransform( + transform, + allInputs, + userGraphContext.getOutputs()); + + ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder(); + for (PCollectionView pCollectionView : transform.getSideInputs()) { + sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); + } + + DoFnExecutor executor; + DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); + if (signature.stateDeclarations().size() > 0 + || signature.timerDeclarations().size() > 0) { + executor = new StatefulDoFnExecutor<>( + userGraphContext.getStepName(), + description, + userGraphContext.getOptions(), + (DoFn<KV, OutputT>) transform.getFn(), + (Coder) WindowedValue.getFullCoder( + input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), + input.getWindowingStrategy(), + (TupleTag<KV>) inputTag, + transform.getSideInputs(), + sideInputTagToView.build(), + mainOutputTag, + sideOutputTags); + } else { + executor = new DoFnExecutor<>( + userGraphContext.getStepName(), + description, + userGraphContext.getOptions(), + transform.getFn(), + WindowedValue.getFullCoder( + input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), + input.getWindowingStrategy(), + (TupleTag<InputT>) inputTag, + transform.getSideInputs(), + sideInputTagToView.build(), + mainOutputTag, + sideOutputTags); + } + + context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs())); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/RunnerUtils.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/RunnerUtils.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/RunnerUtils.java new file mode 100644 index 0000000..4f469f3 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/RunnerUtils.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; + +/** + * Utils for JStorm runner. + */ +class RunnerUtils { + /** + * Convert {@link WindowedValue} into {@link KeyedWorkItem}. + * @param elem + * @return + */ + public static <K, V> KeyedWorkItem<K, V> toKeyedWorkItem(WindowedValue<KV<K, V>> elem) { + WindowedValue<KV<K, V>> kvElem = (WindowedValue<KV<K, V>>) elem; + SingletonKeyedWorkItem<K, V> workItem = SingletonKeyedWorkItem.of( + kvElem.getValue().getKey(), + kvElem.withValue(kvElem.getValue().getValue())); + return workItem; + } + + public static boolean isGroupByKeyExecutor(Executor executor) { + if (executor instanceof GroupByWindowExecutor) { + return true; + } else if (executor instanceof StatefulDoFnExecutor + || executor instanceof MultiStatefulDoFnExecutor) { + return true; + } else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SerializedPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SerializedPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SerializedPipelineOptions.java new file mode 100644 index 0000000..14d2972 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SerializedPipelineOptions.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.jstorm.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * Encapsulates the PipelineOptions in serialized form to ship them to the cluster. + */ +public class SerializedPipelineOptions implements Serializable { + + private final byte[] serializedOptions; + + /** + * Lazily initialized copy of deserialized options. + */ + private transient PipelineOptions pipelineOptions; + + public SerializedPipelineOptions(PipelineOptions options) { + checkNotNull(options, "PipelineOptions must not be null."); + + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + new ObjectMapper().writeValue(baos, options); + this.serializedOptions = baos.toByteArray(); + } catch (Exception e) { + throw new RuntimeException("Couldn't serialize PipelineOptions.", e); + } + + } + + public PipelineOptions getPipelineOptions() { + if (pipelineOptions == null) { + try { + pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class); + } catch (IOException e) { + throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e); + } + } + + return pipelineOptions; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SingletonKeyedWorkItem.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SingletonKeyedWorkItem.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SingletonKeyedWorkItem.java new file mode 100644 index 0000000..b321c76 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SingletonKeyedWorkItem.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import java.util.Collections; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.sdk.util.WindowedValue; + +/** + * Singleton keyed word item. + * @param <K> + * @param <ElemT> + */ +class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> { + + final K key; + final WindowedValue<ElemT> value; + + private SingletonKeyedWorkItem(K key, WindowedValue<ElemT> value) { + this.key = key; + this.value = value; + } + + public static <K, ElemT> SingletonKeyedWorkItem<K, ElemT> of(K key, WindowedValue<ElemT> value) { + return new SingletonKeyedWorkItem<K, ElemT>(key, value); + } + + @Override + public K key() { + return key; + } + + public WindowedValue<ElemT> value() { + return value; + } + + @Override + public Iterable<TimerInternals.TimerData> timersIterable() { + return Collections.EMPTY_LIST; + } + + @Override + public Iterable<WindowedValue<ElemT>> elementsIterable() { + return Collections.singletonList(value); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java new file mode 100644 index 0000000..911f259 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.jstorm.JStormPipelineOptions; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; + +/** + * JStorm {@link Executor} for stateful {@link DoFn}. + * @param <OutputT> + */ +class StatefulDoFnExecutor<OutputT> extends DoFnExecutor<KV, OutputT> { + public StatefulDoFnExecutor( + String stepName, String description, JStormPipelineOptions pipelineOptions, + DoFn<KV, OutputT> doFn, Coder<WindowedValue<KV>> inputCoder, + WindowingStrategy<?, ?> windowingStrategy, TupleTag<KV> mainInputTag, + Collection<PCollectionView<?>> sideInputs, Map<TupleTag, PCollectionView<?>> + sideInputTagToView, TupleTag<OutputT> mainTupleTag, List<TupleTag<?>> sideOutputTags) { + super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, + mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags); + } + + @Override + public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) { + if (mainInputTag.equals(tag)) { + WindowedValue<KV> kvElem = (WindowedValue<KV>) elem; + stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this, + executorContext.getExecutorsBolt().timerService())); + stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(), + kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); + processMainInput(elem); + } else { + processSideInput(tag, elem); + } + } + + @Override + public void onTimer(Object key, TimerInternals.TimerData timerData) { + stepContext.setStateInternals(new JStormStateInternals<>(key, + kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); + super.onTimer(key, timerData); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Stream.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Stream.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Stream.java new file mode 100644 index 0000000..30ff18c --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Stream.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.value.AutoValue; +import java.util.List; +import javax.annotation.Nullable; + +/** + * Class that defines the stream connection between upstream and downstream components. + */ +@AutoValue +public abstract class Stream { + + public abstract Producer getProducer(); + + public abstract Consumer getConsumer(); + + public static Stream of(Producer producer, Consumer consumer) { + return new AutoValue_Stream( + producer, consumer); + } + + /** + * JStorm producer. + */ + @AutoValue + public abstract static class Producer { + public abstract String getComponentId(); + + public abstract String getStreamId(); + + public abstract String getStreamName(); + + public static Producer of(String componentId, String streamId, String streamName) { + return new AutoValue_Stream_Producer( + componentId, streamId, streamName); + } + } + + /** + * JStorm consumer. + */ + @AutoValue + public abstract static class Consumer { + public abstract String getComponentId(); + + public abstract Grouping getGrouping(); + + public static Consumer of(String componentId, Grouping grouping) { + return new AutoValue_Stream_Consumer( + componentId, grouping); + } + } + + /** + * JStorm grouping, which define how to transfer message between two nodes. + */ + @AutoValue + public abstract static class Grouping { + public abstract Type getType(); + + @Nullable + public abstract List<String> getFields(); + + public static Grouping of(Type type) { + checkArgument(!Type.FIELDS.equals(type), "Fields grouping should provide key fields."); + return new AutoValue_Stream_Grouping( + type, null /* fields */); + } + + public static Grouping byFields(List<String> fields) { + checkNotNull(fields, "fields"); + checkArgument(!fields.isEmpty(), "No key fields were provided for field grouping!"); + return new AutoValue_Stream_Grouping( + Type.FIELDS, fields); + } + + /** + * Types of stream groupings Storm allows. + */ + public enum Type { + ALL, CUSTOM, DIRECT, SHUFFLE, LOCAL_OR_SHUFFLE, FIELDS, GLOBAL, NONE + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java new file mode 100644 index 0000000..29345aa --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import java.io.Serializable; +import java.util.List; +import org.apache.beam.runners.core.TimerInternals; +import org.joda.time.Instant; + +/** + * Interface that tracks input watermarks and manages timers in each bolt. + */ +interface TimerService extends Serializable { + + void init(List<Integer> upStreamTasks); + + /** + * + * @param task + * @param inputWatermark + * @return new watermark if any timer is triggered during the update of watermark, otherwise 0 + */ + long updateInputWatermark(Integer task, long inputWatermark); + + long currentInputWatermark(); + + long currentOutputWatermark(); + + void clearWatermarkHold(String namespace); + + void addWatermarkHold(String namespace, Instant watermarkHold); + + void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor); + + void fireTimers(long newWatermark); +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java new file mode 100644 index 0000000..c2600e5 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.alibaba.jstorm.utils.Pair; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.joda.time.Instant; + +/** + * Default implementation of {@link TimerService}. + */ +class TimerServiceImpl implements TimerService { + private transient ExecutorContext executorContext; + private transient Map<Integer, DoFnExecutor> idToDoFnExecutor; + + private final ConcurrentMap<Integer, Long> upStreamTaskToInputWatermark = + new ConcurrentHashMap<>(); + private final PriorityQueue<Long> inputWatermarks = new PriorityQueue<>(); + private final PriorityQueue<Instant> watermarkHolds = new PriorityQueue<>(); + private final Map<String, Instant> namespaceToWatermarkHold = new HashMap<>(); + private final transient PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue = + new PriorityQueue<>(); + private final Map<TimerInternals.TimerData, Set<Pair<Integer, Object>>> + timerDataToKeyedExecutors = Maps.newHashMap(); + + private boolean initialized = false; + + public TimerServiceImpl() { + } + + public TimerServiceImpl(ExecutorContext executorContext) { + this.executorContext = executorContext; + this.idToDoFnExecutor = executorContext.getExecutorsBolt().getIdToDoFnExecutor(); + } + + @Override + public void init(List<Integer> upStreamTasks) { + for (Integer task : upStreamTasks) { + upStreamTaskToInputWatermark.put(task, BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); + inputWatermarks.add(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); + } + initialized = true; + } + + @Override + public synchronized long updateInputWatermark(Integer task, long taskInputWatermark) { + checkState(initialized, "TimerService has not been initialized."); + Long oldTaskInputWatermark = upStreamTaskToInputWatermark.get(task); + // Make sure the input watermark don't go backward. + if (taskInputWatermark > oldTaskInputWatermark) { + upStreamTaskToInputWatermark.put(task, taskInputWatermark); + inputWatermarks.add(taskInputWatermark); + inputWatermarks.remove(oldTaskInputWatermark); + + long newLocalInputWatermark = currentInputWatermark(); + if (newLocalInputWatermark > oldTaskInputWatermark) { + return newLocalInputWatermark; + } + } + return 0; + } + + @Override + public void fireTimers(long newWatermark) { + TimerInternals.TimerData timerData; + while ((timerData = eventTimeTimersQueue.peek()) != null + && timerData.getTimestamp().getMillis() <= newWatermark) { + for (Pair<Integer, Object> keyedExecutor : timerDataToKeyedExecutors.get(timerData)) { + DoFnExecutor executor = idToDoFnExecutor.get(keyedExecutor.getFirst()); + executor.onTimer(keyedExecutor.getSecond(), timerData); + } + eventTimeTimersQueue.remove(); + timerDataToKeyedExecutors.remove(timerData); + } + } + + @Override + public long currentInputWatermark() { + return initialized ? inputWatermarks.peek() : BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); + } + + @Override + public long currentOutputWatermark() { + if (watermarkHolds.isEmpty()) { + return currentInputWatermark(); + } else { + return Math.min(currentInputWatermark(), watermarkHolds.peek().getMillis()); + } + } + + @Override + public void clearWatermarkHold(String namespace) { + Instant currentHold = namespaceToWatermarkHold.get(namespace); + if (currentHold != null) { + watermarkHolds.remove(currentHold); + namespaceToWatermarkHold.remove(namespace); + } + } + + @Override + public void addWatermarkHold(String namespace, Instant watermarkHold) { + Instant currentHold = namespaceToWatermarkHold.get(namespace); + if (currentHold == null) { + namespaceToWatermarkHold.put(namespace, watermarkHold); + watermarkHolds.add(watermarkHold); + } else if (currentHold != null && watermarkHold.isBefore(currentHold)) { + namespaceToWatermarkHold.put(namespace, watermarkHold); + watermarkHolds.add(watermarkHold); + watermarkHolds.remove(currentHold); + } + } + + @Override + public void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor) { + checkArgument( + TimeDomain.EVENT_TIME.equals(timerData.getDomain()), + String.format("Does not support domain: %s.", timerData.getDomain())); + Set<Pair<Integer, Object>> keyedExecutors = timerDataToKeyedExecutors.get(timerData); + if (keyedExecutors == null) { + keyedExecutors = Sets.newHashSet(); + eventTimeTimersQueue.add(timerData); + } + keyedExecutors.add(new Pair<>(doFnExecutor.getInternalDoFnExecutorId(), key)); + timerDataToKeyedExecutors.put(timerData, keyedExecutors); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java new file mode 100644 index 0000000..edd3d8a --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.collect.FluentIterable; +import java.util.Map; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Interface for classes capable of tranforming Beam PTransforms into Storm primitives. + */ +interface TransformTranslator<T extends PTransform<?, ?>> { + + void translateNode(T transform, TranslationContext context); + + /** + * Returns true if this translator can translate the given transform. + */ + boolean canTranslate(T transform, TranslationContext context); + + /** + * Default translator. + * @param <T1> + */ + class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> { + @Override + public void translateNode(T1 transform, TranslationContext context) { + + } + + @Override + public boolean canTranslate(T1 transform, TranslationContext context) { + return true; + } + + static String describeTransform( + PTransform<?, ?> transform, + Map<TupleTag<?>, PValue> inputs, + Map<TupleTag<?>, PValue> outputs) { + return String.format("%s --> %s --> %s", + Joiner.on('+').join(FluentIterable.from(inputs.entrySet()) + .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() { + @Override + public String apply(Map.Entry<TupleTag<?>, PValue> taggedPValue) { + return taggedPValue.getKey().getId(); + // return taggedPValue.getValue().getName(); + } + })), + transform.getName(), + Joiner.on('+').join(FluentIterable.from(outputs.entrySet()) + .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() { + @Override + public String apply(Map.Entry<TupleTag<?>, PValue> taggedPvalue) { + return taggedPvalue.getKey().getId(); + //return taggedPValue.getValue().getName(); + } + }))); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java index 28d102d..b84fd4a 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java @@ -34,12 +34,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.beam.runners.jstorm.JStormPipelineOptions; -import org.apache.beam.runners.jstorm.translation.runtime.Executor; -import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt; -import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout; -import org.apache.beam.runners.jstorm.translation.translator.Stream; -import org.apache.beam.runners.jstorm.translation.util.CommonInstance; -import org.apache.beam.runners.jstorm.util.RunnerUtils; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.PValueBase; http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java index 316186e..9eaa13a 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java @@ -19,15 +19,6 @@ package org.apache.beam.runners.jstorm.translation; import java.util.HashMap; import java.util.Map; -import org.apache.beam.runners.jstorm.translation.translator.BoundedSourceTranslator; -import org.apache.beam.runners.jstorm.translation.translator.FlattenTranslator; -import org.apache.beam.runners.jstorm.translation.translator.GroupByKeyTranslator; -import org.apache.beam.runners.jstorm.translation.translator.ParDoBoundMultiTranslator; -import org.apache.beam.runners.jstorm.translation.translator.ParDoBoundTranslator; -import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator; -import org.apache.beam.runners.jstorm.translation.translator.UnboundedSourceTranslator; -import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator; -import org.apache.beam.runners.jstorm.translation.translator.WindowAssignTranslator; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; @@ -40,7 +31,7 @@ import org.slf4j.LoggerFactory; /** * Lookup table mapping PTransform types to associated TransformTranslator implementations. */ -public class TranslatorRegistry { +class TranslatorRegistry { private static final Logger LOG = LoggerFactory.getLogger(TranslatorRegistry.class); private static final Map<Class<? extends PTransform>, TransformTranslator> TRANSLATORS = http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TxExecutorsBolt.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TxExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TxExecutorsBolt.java new file mode 100644 index 0000000..2159cfa --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TxExecutorsBolt.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Tuple; +import com.alibaba.jstorm.cache.IKvStore; +import com.alibaba.jstorm.cache.IKvStoreManager; +import com.alibaba.jstorm.transactional.bolt.ITransactionStatefulBoltExecutor; +import java.io.IOException; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Transactional executors bolt handles the checkpoint and restore of state and timer. + */ +public class TxExecutorsBolt implements ITransactionStatefulBoltExecutor { + private static final Logger LOG = LoggerFactory.getLogger(TxExecutorsBolt.class); + + private static final String TIME_SERVICE_STORE_ID = "timer_service_store"; + private static final String TIMER_SERVICE_KET = "timer_service_key"; + + private ExecutorsBolt executorsBolt; + private IKvStoreManager kvStoreManager; + private IKvStore<String, TimerService> timerServiceStore; + + public TxExecutorsBolt(ExecutorsBolt executorsBolt) { + this.executorsBolt = executorsBolt; + this.executorsBolt.setStatefulBolt(true); + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + try { + executorsBolt.prepare(stormConf, context, collector); + kvStoreManager = executorsBolt.getExecutorContext().getKvStoreManager(); + timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID); + } catch (IOException e) { + LOG.error("Failed to prepare stateful bolt", e); + throw new RuntimeException(e.getMessage()); + } + } + + @Override + public void execute(Tuple input) { + executorsBolt.execute(input); + } + + @Override + public void cleanup() { + executorsBolt.cleanup(); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + executorsBolt.declareOutputFields(declarer); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return executorsBolt.getComponentConfiguration(); + } + + @Override + public void initState(Object userState) { + LOG.info("Begin to init from state: {}", userState); + restore(userState); + } + + @Override + public Object finishBatch(long batchId) { + try { + timerServiceStore.put(TIMER_SERVICE_KET, executorsBolt.timerService()); + } catch (IOException e) { + LOG.error("Failed to store current timer service status", e); + throw new RuntimeException(e.getMessage()); + } + kvStoreManager.checkpoint(batchId); + return null; + } + + @Override + public Object commit(long batchId, Object state) { + return kvStoreManager.backup(batchId); + } + + @Override + public void rollBack(Object userState) { + LOG.info("Begin to rollback from state: {}", userState); + restore(userState); + } + + @Override + public void ackCommit(long batchId, long timeStamp) { + kvStoreManager.remove(batchId); + } + + private void restore(Object userState) { + try { + // restore all states + kvStoreManager.restore(userState); + + // init timer service + timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID); + TimerService timerService = timerServiceStore.get(TIMER_SERVICE_KET); + if (timerService == null) { + timerService = executorsBolt.initTimerService(); + } + executorsBolt.setTimerService(timerService); + } catch (IOException e) { + LOG.error("Failed to restore state", e); + throw new RuntimeException(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TxUnboundedSourceSpout.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TxUnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TxUnboundedSourceSpout.java new file mode 100644 index 0000000..382cb50 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TxUnboundedSourceSpout.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import com.alibaba.jstorm.cache.IKvStore; +import com.alibaba.jstorm.cache.IKvStoreManager; +import com.alibaba.jstorm.cache.KvStoreManagerFactory; +import com.alibaba.jstorm.transactional.spout.ITransactionSpoutExecutor; +import java.io.IOException; +import java.util.Map; +import org.apache.beam.sdk.io.UnboundedSource; +import org.slf4j.LoggerFactory; + +/** + * Transactional unbounded source spout handles the checkpoint and restore of state and timer. + */ +public class TxUnboundedSourceSpout implements ITransactionSpoutExecutor { + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TxUnboundedSourceSpout.class); + + private static final String SOURCE_STORE_ID = "SourceCheckpoint"; + private static final String CHECKPOINT_MARK = "CheckpointMark"; + + private UnboundedSourceSpout sourceSpout; + private UnboundedSource.UnboundedReader reader; + private IKvStoreManager kvStoreManager; + private IKvStore<String, UnboundedSource.CheckpointMark> sourceCheckpointStore; + + public TxUnboundedSourceSpout(UnboundedSourceSpout sourceSpout) { + this.sourceSpout = sourceSpout; + } + + private void restore(Object userState) { + try { + kvStoreManager.restore(userState); + sourceCheckpointStore = kvStoreManager.getOrCreate(SOURCE_STORE_ID); + UnboundedSource.CheckpointMark checkpointMark = sourceCheckpointStore.get(CHECKPOINT_MARK); + sourceSpout.createSourceReader(checkpointMark); + reader = sourceSpout.getUnboundedSourceReader(); + } catch (IOException e) { + LOG.error("Failed to init state", e); + throw new RuntimeException(e.getMessage()); + } + } + + @Override + public void initState(Object userState) { + restore(userState); + } + + @Override + public Object finishBatch(long checkpointId) { + try { + // Store check point mark from unbounded source reader + UnboundedSource.CheckpointMark checkpointMark = reader.getCheckpointMark(); + sourceCheckpointStore.put(CHECKPOINT_MARK, checkpointMark); + + // checkpoint all kv stores in current manager + kvStoreManager.checkpoint(checkpointId); + } catch (IOException e) { + LOG.error(String.format("Failed to finish batch-%s", checkpointId), e); + throw new RuntimeException(e.getMessage()); + } + return null; + } + + @Override + public Object commit(long batchId, Object state) { + // backup kv stores to remote state backend + return kvStoreManager.backup(batchId); + } + + @Override + public void rollBack(Object userState) { + restore(userState); + } + + @Override + public void ackCommit(long batchId, long timeStamp) { + // remove obsolete state in bolt local and remote state backend + kvStoreManager.remove(batchId); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + sourceSpout.declareOutputFields(declarer); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return sourceSpout.getComponentConfiguration(); + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + try { + sourceSpout.open(conf, context, collector); + String storeName = String.format("task-%s", context.getThisTaskId()); + String storePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName); + kvStoreManager = KvStoreManagerFactory.getKvStoreManagerWithMonitor( + context, storeName, storePath, true); + + reader = sourceSpout.getUnboundedSourceReader(); + } catch (IOException e) { + LOG.error("Failed to open transactional unbounded source spout", e); + throw new RuntimeException(e.getMessage()); + } + } + + @Override + public void close() { + sourceSpout.close(); + } + + @Override + public void activate() { + sourceSpout.activate(); + } + + @Override + public void deactivate() { + sourceSpout.deactivate(); + } + + @Override + public void nextTuple() { + sourceSpout.nextTuple(); + } + + @Override + public void ack(Object msgId) { + throw new UnsupportedOperationException(); + } + + @Override + public void fail(Object msgId) { + throw new UnsupportedOperationException(); + } +}
