[BEAM-2824] support PipelineResult.waitUntilFinish() in jstorm local mode.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/df75d807
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/df75d807
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/df75d807

Branch: refs/heads/jstorm-runner
Commit: df75d8074d7067bf6078289f1f2dfa36548fcd5e
Parents: cda4e62
Author: Pei He <p...@apache.org>
Authored: Tue Aug 29 20:10:06 2017 +0800
Committer: Pei He <p...@apache.org>
Committed: Mon Sep 4 12:57:50 2017 +0800

----------------------------------------------------------------------
 .../beam/runners/jstorm/JStormRunnerResult.java | 44 +++++++++++++++++++-
 .../jstorm/translation/CommonInstance.java      |  5 +++
 .../jstorm/translation/DoFnExecutor.java        |  4 +-
 .../jstorm/translation/ExecutorsBolt.java       | 35 +++++++++++++---
 .../translation/GroupByWindowExecutor.java      |  2 +-
 .../jstorm/translation/MetricsReporter.java     |  6 +--
 .../translation/UnboundedSourceSpout.java       | 38 ++++++++++-------
 7 files changed, 104 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/df75d807/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
----------------------------------------------------------------------
diff --git 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
index 98d967f..782896e 100644
--- 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
+++ 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
@@ -17,15 +17,26 @@
  */
 package org.apache.beam.runners.jstorm;
 
+import static com.alibaba.jstorm.metric.AsmWindow.M10_WINDOW;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import backtype.storm.Config;
 import backtype.storm.LocalCluster;
+import com.alibaba.jstorm.common.metric.AsmGauge;
+import com.alibaba.jstorm.metric.AsmMetricRegistry;
+import com.alibaba.jstorm.metric.JStormMetrics;
 import com.alibaba.jstorm.utils.JStormUtils;
 import java.io.IOException;
+import java.util.Map;
+import org.apache.beam.runners.jstorm.translation.CommonInstance;
 import org.apache.beam.runners.jstorm.translation.JStormMetricResults;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
 import org.joda.time.Duration;
 
 /**
@@ -88,12 +99,21 @@ public abstract class JStormRunnerResult implements 
PipelineResult {
 
     @Override
     public State waitUntilFinish(Duration duration) {
-      JStormUtils.sleepMs(duration.getMillis());
+      Sleeper sleeper = Sleeper.DEFAULT;
+      BackOff backOff = 
FluentBackoff.DEFAULT.withMaxCumulativeBackoff(duration).backoff();
       try {
-        return cancel();
+        do {
+          if (globalWatermark() >= 
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+            return State.DONE;
+          }
+        } while (BackOffUtils.next(sleeper, backOff));
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        // Ignore InterruptedException
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
+      return State.RUNNING;
     }
 
     @Override
@@ -105,5 +125,25 @@ public abstract class JStormRunnerResult implements 
PipelineResult {
     public MetricResults metrics() {
       return new JStormMetricResults();
     }
+
+    private long globalWatermark() {
+      AsmMetricRegistry metricRegistry = JStormMetrics.getTaskMetrics();
+      boolean foundWatermark = false;
+      double min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+      for (Map.Entry<String, AsmGauge> entry : 
metricRegistry.getGauges().entrySet()) {
+        if 
(entry.getKey().endsWith(CommonInstance.BEAM_OUTPUT_WATERMARK_METRICS)) {
+          foundWatermark = true;
+          double outputWatermark = (double) 
entry.getValue().getValue(M10_WINDOW);
+          if (outputWatermark < min) {
+            min = outputWatermark;
+          }
+        }
+      }
+      if (foundWatermark) {
+        return (long) min;
+      } else {
+        return BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/df75d807/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/CommonInstance.java
----------------------------------------------------------------------
diff --git 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/CommonInstance.java
 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/CommonInstance.java
index b7154cd..1e4f4ae 100644
--- 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/CommonInstance.java
+++ 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/CommonInstance.java
@@ -25,4 +25,9 @@ public class CommonInstance {
   public static final String VALUE = "Value";
 
   public static final String BEAM_WATERMARK_STREAM_ID = "BEAM_WATERMARK";
+
+  public static final String METRIC_KEY_SEPARATOR = "__";
+  public static final String BEAM_SOURCE_WATERMARK_METRICS = 
"__beam_source_watermark";
+  public static final String BEAM_INPUT_WATERMARK_METRICS = 
"__beam_input_watermark";
+  public static final String BEAM_OUTPUT_WATERMARK_METRICS = 
"__beam_output_watermark";
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/df75d807/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
index 72c386a..1ceaf9e 100644
--- 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
+++ 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
@@ -120,7 +120,6 @@ class DoFnExecutor<InputT, OutputT> implements Executor {
   protected transient StateTag<WatermarkHoldState> watermarkHoldTag;
   protected transient IKvStoreManager kvStoreManager;
   protected transient DefaultStepContext stepContext;
-  protected transient MetricClient metricClient;
 
   public DoFnExecutor(
       String stepName,
@@ -159,7 +158,7 @@ class DoFnExecutor<InputT, OutputT> implements Executor {
             this.sideOutputTags,
             this.stepContext,
             this.windowingStrategy),
-        MetricsReporter.create(metricClient));
+        MetricsReporter.create(executorsBolt.metricClient()));
   }
 
   protected void initService(ExecutorContext context) {
@@ -170,7 +169,6 @@ class DoFnExecutor<InputT, OutputT> implements Executor {
     stepContext = new DefaultStepContext(timerInternals,
         new JStormStateInternals(
             null, kvStoreManager, executorsBolt.timerService(), 
internalDoFnExecutorId));
-    metricClient = new MetricClient(executorContext.getTopologyContext());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/df75d807/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
----------------------------------------------------------------------
diff --git 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
index 449ecb5..3d58a37 100644
--- 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
+++ 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
@@ -28,6 +28,8 @@ import backtype.storm.tuple.Values;
 import com.alibaba.jstorm.cache.IKvStoreManager;
 import com.alibaba.jstorm.cache.KvStoreManagerFactory;
 import com.alibaba.jstorm.cluster.Common;
+import com.alibaba.jstorm.metric.MetricClient;
+import com.alibaba.jstorm.metrics.Gauge;
 import com.alibaba.jstorm.utils.KryoSerializer;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
@@ -60,7 +62,9 @@ public class ExecutorsBolt extends AbstractComponent 
implements IRichBatchBolt {
 
   protected transient ExecutorContext executorContext;
 
-  protected transient TimerService timerService;
+  private transient TimerService timerService;
+
+  private transient MetricClient metricClient;
 
   // map from input tag to executor inside bolt
   protected final Map<TupleTag, Executor> inputTagToExecutor = 
Maps.newHashMap();
@@ -80,7 +84,6 @@ public class ExecutorsBolt extends AbstractComponent 
implements IRichBatchBolt {
   protected KryoSerializer<WindowedValue> serializer;
 
   public ExecutorsBolt() {
-
   }
 
   public void setStatefulBolt(boolean isStateful) {
@@ -150,12 +153,13 @@ public class ExecutorsBolt extends AbstractComponent 
implements IRichBatchBolt {
 
   @Override
   public void prepare(Map stormConf, TopologyContext context, OutputCollector 
collector) {
-    LOG.info("Start to prepare for task-{}", context.getThisTaskId());
+    int taskId = context.getThisTaskId();
+    LOG.info("Start to prepare for task-{}", taskId);
     try {
       this.collector = collector;
 
       // init kv store manager
-      String storeName = String.format("task-%d", context.getThisTaskId());
+      String storeName = String.format("task-%d", taskId);
       String stateStorePath = String.format("%s/beam/%s", 
context.getWorkerIdDir(), storeName);
       IKvStoreManager kvStoreManager = isStatefulBolt
               ? KvStoreManagerFactory.getKvStoreManagerWithMonitor(
@@ -167,6 +171,23 @@ public class ExecutorsBolt extends AbstractComponent 
implements IRichBatchBolt {
       // init time service
       timerService = initTimerService();
 
+      // init metrics
+      metricClient = new MetricClient(executorContext.getTopologyContext());
+      metricClient.registerGauge(
+          context.getThisComponentId() + 
CommonInstance.BEAM_INPUT_WATERMARK_METRICS,
+          new Gauge<Double>() {
+            @Override
+            public Double getValue() {
+              return (double) timerService.currentInputWatermark();
+            }});
+      metricClient.registerGauge(
+          context.getThisComponentId() + 
CommonInstance.BEAM_OUTPUT_WATERMARK_METRICS,
+          new Gauge<Double>() {
+            @Override
+            public Double getValue() {
+              return (double) timerService.currentOutputWatermark();
+            }});
+
       // init all internal executors
       for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) {
         executor.init(executorContext);
@@ -175,7 +196,7 @@ public class ExecutorsBolt extends AbstractComponent 
implements IRichBatchBolt {
         }
       }
 
-      this.serializer = new KryoSerializer<WindowedValue>(stormConf);
+      this.serializer = new KryoSerializer<>(stormConf);
 
       LOG.info("ExecutorsBolt finished init. LocalExecutors={}", 
inputTagToExecutor.values());
       LOG.info("inputTagToExecutor={}", inputTagToExecutor);
@@ -296,6 +317,10 @@ public class ExecutorsBolt extends AbstractComponent 
implements IRichBatchBolt {
     return timerService;
   }
 
+  public MetricClient metricClient() {
+    return metricClient;
+  }
+
   public void setTimerService(TimerService service) {
     timerService = service;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/df75d807/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java
 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java
index cae1bc3..4455e0f 100644
--- 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java
+++ 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java
@@ -132,7 +132,7 @@ class GroupByWindowExecutor<K, V>
             this.stepContext,
             this.windowingStrategy);
     return new DoFnRunnerWithMetrics<>(
-        stepName, doFnRunner, MetricsReporter.create(metricClient));
+        stepName, doFnRunner, 
MetricsReporter.create(executorsBolt.metricClient()));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/df75d807/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
----------------------------------------------------------------------
diff --git 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
index cc8c1f8..1e38d1c 100644
--- 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
+++ 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
@@ -107,8 +107,8 @@ class MetricsReporter {
 
   private String getMetricNameString(String prefix, MetricResult<?> 
metricResult) {
     return prefix
-        + METRIC_KEY_SEPARATOR + metricResult.step()
-        + METRIC_KEY_SEPARATOR + metricResult.name().namespace()
-        + METRIC_KEY_SEPARATOR + metricResult.name().name();
+        + CommonInstance.METRIC_KEY_SEPARATOR + metricResult.step()
+        + CommonInstance.METRIC_KEY_SEPARATOR + metricResult.name().namespace()
+        + CommonInstance.METRIC_KEY_SEPARATOR + metricResult.name().name();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/df75d807/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
index 627a834..73f1f0d 100644
--- 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
+++ 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
@@ -23,6 +23,8 @@ import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.tuple.Values;
+import com.alibaba.jstorm.metric.MetricClient;
+import com.alibaba.jstorm.metrics.Gauge;
 import com.alibaba.jstorm.utils.KryoSerializer;
 import java.io.IOException;
 import java.util.Map;
@@ -115,28 +117,32 @@ public class UnboundedSourceSpout extends 
AbstractComponent implements IRichSpou
 
   @Override
   public synchronized void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
-    try {
-      this.collector = collector;
-      this.pipelineOptions =
-          
this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class);
-
-      createSourceReader(null);
+    this.collector = collector;
+    this.pipelineOptions =
+        
this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class);
+    this.serializer = new KryoSerializer<>(conf);
+    createSourceReader(null);
+    new MetricClient(context).registerGauge(
+        context.getThisComponentId() + 
CommonInstance.BEAM_SOURCE_WATERMARK_METRICS,
+        new Gauge<Double>() {
+          @Override
+          public Double getValue() {
+            return (double) reader.getWatermark().getMillis();
+          }});
+  }
 
-      this.serializer = new KryoSerializer<>(conf);
+  public synchronized void createSourceReader(UnboundedSource.CheckpointMark 
checkpointMark) {
+    try {
+      if (reader != null) {
+        reader.close();
+      }
+      reader = this.source.createReader(this.pipelineOptions, checkpointMark);
+      hasNextRecord = this.reader.start();
     } catch (IOException e) {
       throw new RuntimeException("Unable to create unbounded reader.", e);
     }
   }
 
-  public synchronized void createSourceReader(UnboundedSource.CheckpointMark 
checkpointMark)
-      throws IOException {
-    if (reader != null) {
-      reader.close();
-    }
-    reader = this.source.createReader(this.pipelineOptions, checkpointMark);
-    hasNextRecord = this.reader.start();
-  }
-
   @Override
   public synchronized void nextTuple() {
     if (!activated.get()) {

Reply via email to