[BEAM-2824] support PipelineResult.waitUntilFinish() in jstorm local mode.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/df75d807 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/df75d807 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/df75d807 Branch: refs/heads/jstorm-runner Commit: df75d8074d7067bf6078289f1f2dfa36548fcd5e Parents: cda4e62 Author: Pei He <p...@apache.org> Authored: Tue Aug 29 20:10:06 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Mon Sep 4 12:57:50 2017 +0800 ---------------------------------------------------------------------- .../beam/runners/jstorm/JStormRunnerResult.java | 44 +++++++++++++++++++- .../jstorm/translation/CommonInstance.java | 5 +++ .../jstorm/translation/DoFnExecutor.java | 4 +- .../jstorm/translation/ExecutorsBolt.java | 35 +++++++++++++--- .../translation/GroupByWindowExecutor.java | 2 +- .../jstorm/translation/MetricsReporter.java | 6 +-- .../translation/UnboundedSourceSpout.java | 38 ++++++++++------- 7 files changed, 104 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/df75d807/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java index 98d967f..782896e 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java @@ -17,15 +17,26 @@ */ package org.apache.beam.runners.jstorm; +import static com.alibaba.jstorm.metric.AsmWindow.M10_WINDOW; import static com.google.common.base.Preconditions.checkNotNull; import backtype.storm.Config; import backtype.storm.LocalCluster; +import com.alibaba.jstorm.common.metric.AsmGauge; +import com.alibaba.jstorm.metric.AsmMetricRegistry; +import com.alibaba.jstorm.metric.JStormMetrics; import com.alibaba.jstorm.utils.JStormUtils; import java.io.IOException; +import java.util.Map; +import org.apache.beam.runners.jstorm.translation.CommonInstance; import org.apache.beam.runners.jstorm.translation.JStormMetricResults; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; import org.joda.time.Duration; /** @@ -88,12 +99,21 @@ public abstract class JStormRunnerResult implements PipelineResult { @Override public State waitUntilFinish(Duration duration) { - JStormUtils.sleepMs(duration.getMillis()); + Sleeper sleeper = Sleeper.DEFAULT; + BackOff backOff = FluentBackoff.DEFAULT.withMaxCumulativeBackoff(duration).backoff(); try { - return cancel(); + do { + if (globalWatermark() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + return State.DONE; + } + } while (BackOffUtils.next(sleeper, backOff)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // Ignore InterruptedException } catch (IOException e) { throw new RuntimeException(e); } + return State.RUNNING; } @Override @@ -105,5 +125,25 @@ public abstract class JStormRunnerResult implements PipelineResult { public MetricResults metrics() { return new JStormMetricResults(); } + + private long globalWatermark() { + AsmMetricRegistry metricRegistry = JStormMetrics.getTaskMetrics(); + boolean foundWatermark = false; + double min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); + for (Map.Entry<String, AsmGauge> entry : metricRegistry.getGauges().entrySet()) { + if (entry.getKey().endsWith(CommonInstance.BEAM_OUTPUT_WATERMARK_METRICS)) { + foundWatermark = true; + double outputWatermark = (double) entry.getValue().getValue(M10_WINDOW); + if (outputWatermark < min) { + min = outputWatermark; + } + } + } + if (foundWatermark) { + return (long) min; + } else { + return BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); + } + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/df75d807/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/CommonInstance.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/CommonInstance.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/CommonInstance.java index b7154cd..1e4f4ae 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/CommonInstance.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/CommonInstance.java @@ -25,4 +25,9 @@ public class CommonInstance { public static final String VALUE = "Value"; public static final String BEAM_WATERMARK_STREAM_ID = "BEAM_WATERMARK"; + + public static final String METRIC_KEY_SEPARATOR = "__"; + public static final String BEAM_SOURCE_WATERMARK_METRICS = "__beam_source_watermark"; + public static final String BEAM_INPUT_WATERMARK_METRICS = "__beam_input_watermark"; + public static final String BEAM_OUTPUT_WATERMARK_METRICS = "__beam_output_watermark"; } http://git-wip-us.apache.org/repos/asf/beam/blob/df75d807/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java index 72c386a..1ceaf9e 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java @@ -120,7 +120,6 @@ class DoFnExecutor<InputT, OutputT> implements Executor { protected transient StateTag<WatermarkHoldState> watermarkHoldTag; protected transient IKvStoreManager kvStoreManager; protected transient DefaultStepContext stepContext; - protected transient MetricClient metricClient; public DoFnExecutor( String stepName, @@ -159,7 +158,7 @@ class DoFnExecutor<InputT, OutputT> implements Executor { this.sideOutputTags, this.stepContext, this.windowingStrategy), - MetricsReporter.create(metricClient)); + MetricsReporter.create(executorsBolt.metricClient())); } protected void initService(ExecutorContext context) { @@ -170,7 +169,6 @@ class DoFnExecutor<InputT, OutputT> implements Executor { stepContext = new DefaultStepContext(timerInternals, new JStormStateInternals( null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); - metricClient = new MetricClient(executorContext.getTopologyContext()); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/df75d807/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java index 449ecb5..3d58a37 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java @@ -28,6 +28,8 @@ import backtype.storm.tuple.Values; import com.alibaba.jstorm.cache.IKvStoreManager; import com.alibaba.jstorm.cache.KvStoreManagerFactory; import com.alibaba.jstorm.cluster.Common; +import com.alibaba.jstorm.metric.MetricClient; +import com.alibaba.jstorm.metrics.Gauge; import com.alibaba.jstorm.utils.KryoSerializer; import com.google.common.base.Function; import com.google.common.base.Joiner; @@ -60,7 +62,9 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { protected transient ExecutorContext executorContext; - protected transient TimerService timerService; + private transient TimerService timerService; + + private transient MetricClient metricClient; // map from input tag to executor inside bolt protected final Map<TupleTag, Executor> inputTagToExecutor = Maps.newHashMap(); @@ -80,7 +84,6 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { protected KryoSerializer<WindowedValue> serializer; public ExecutorsBolt() { - } public void setStatefulBolt(boolean isStateful) { @@ -150,12 +153,13 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - LOG.info("Start to prepare for task-{}", context.getThisTaskId()); + int taskId = context.getThisTaskId(); + LOG.info("Start to prepare for task-{}", taskId); try { this.collector = collector; // init kv store manager - String storeName = String.format("task-%d", context.getThisTaskId()); + String storeName = String.format("task-%d", taskId); String stateStorePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName); IKvStoreManager kvStoreManager = isStatefulBolt ? KvStoreManagerFactory.getKvStoreManagerWithMonitor( @@ -167,6 +171,23 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { // init time service timerService = initTimerService(); + // init metrics + metricClient = new MetricClient(executorContext.getTopologyContext()); + metricClient.registerGauge( + context.getThisComponentId() + CommonInstance.BEAM_INPUT_WATERMARK_METRICS, + new Gauge<Double>() { + @Override + public Double getValue() { + return (double) timerService.currentInputWatermark(); + }}); + metricClient.registerGauge( + context.getThisComponentId() + CommonInstance.BEAM_OUTPUT_WATERMARK_METRICS, + new Gauge<Double>() { + @Override + public Double getValue() { + return (double) timerService.currentOutputWatermark(); + }}); + // init all internal executors for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) { executor.init(executorContext); @@ -175,7 +196,7 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { } } - this.serializer = new KryoSerializer<WindowedValue>(stormConf); + this.serializer = new KryoSerializer<>(stormConf); LOG.info("ExecutorsBolt finished init. LocalExecutors={}", inputTagToExecutor.values()); LOG.info("inputTagToExecutor={}", inputTagToExecutor); @@ -296,6 +317,10 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { return timerService; } + public MetricClient metricClient() { + return metricClient; + } + public void setTimerService(TimerService service) { timerService = service; } http://git-wip-us.apache.org/repos/asf/beam/blob/df75d807/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java index cae1bc3..4455e0f 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java @@ -132,7 +132,7 @@ class GroupByWindowExecutor<K, V> this.stepContext, this.windowingStrategy); return new DoFnRunnerWithMetrics<>( - stepName, doFnRunner, MetricsReporter.create(metricClient)); + stepName, doFnRunner, MetricsReporter.create(executorsBolt.metricClient())); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/df75d807/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java index cc8c1f8..1e38d1c 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java @@ -107,8 +107,8 @@ class MetricsReporter { private String getMetricNameString(String prefix, MetricResult<?> metricResult) { return prefix - + METRIC_KEY_SEPARATOR + metricResult.step() - + METRIC_KEY_SEPARATOR + metricResult.name().namespace() - + METRIC_KEY_SEPARATOR + metricResult.name().name(); + + CommonInstance.METRIC_KEY_SEPARATOR + metricResult.step() + + CommonInstance.METRIC_KEY_SEPARATOR + metricResult.name().namespace() + + CommonInstance.METRIC_KEY_SEPARATOR + metricResult.name().name(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/df75d807/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java index 627a834..73f1f0d 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java @@ -23,6 +23,8 @@ import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.tuple.Values; +import com.alibaba.jstorm.metric.MetricClient; +import com.alibaba.jstorm.metrics.Gauge; import com.alibaba.jstorm.utils.KryoSerializer; import java.io.IOException; import java.util.Map; @@ -115,28 +117,32 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou @Override public synchronized void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - try { - this.collector = collector; - this.pipelineOptions = - this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class); - - createSourceReader(null); + this.collector = collector; + this.pipelineOptions = + this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class); + this.serializer = new KryoSerializer<>(conf); + createSourceReader(null); + new MetricClient(context).registerGauge( + context.getThisComponentId() + CommonInstance.BEAM_SOURCE_WATERMARK_METRICS, + new Gauge<Double>() { + @Override + public Double getValue() { + return (double) reader.getWatermark().getMillis(); + }}); + } - this.serializer = new KryoSerializer<>(conf); + public synchronized void createSourceReader(UnboundedSource.CheckpointMark checkpointMark) { + try { + if (reader != null) { + reader.close(); + } + reader = this.source.createReader(this.pipelineOptions, checkpointMark); + hasNextRecord = this.reader.start(); } catch (IOException e) { throw new RuntimeException("Unable to create unbounded reader.", e); } } - public synchronized void createSourceReader(UnboundedSource.CheckpointMark checkpointMark) - throws IOException { - if (reader != null) { - reader.close(); - } - reader = this.source.createReader(this.pipelineOptions, checkpointMark); - hasNextRecord = this.reader.start(); - } - @Override public synchronized void nextTuple() { if (!activated.get()) {