[BEAM-2824] Uses PipelineResult in TestJStormRunner.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6f40506a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6f40506a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6f40506a

Branch: refs/heads/jstorm-runner
Commit: 6f40506a979bdcac3d1125bbe809b092d497a2f6
Parents: df75d80
Author: Pei He <p...@apache.org>
Authored: Wed Aug 30 14:50:20 2017 +0800
Committer: Pei He <p...@apache.org>
Committed: Mon Sep 4 12:57:53 2017 +0800

----------------------------------------------------------------------
 .../beam/runners/jstorm/JStormRunnerResult.java |   6 ++
 .../beam/runners/jstorm/TestJStormRunner.java   | 100 ++++++++++++-------
 .../jstorm/translation/DoFnExecutor.java        |   1 -
 3 files changed, 68 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6f40506a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
----------------------------------------------------------------------
diff --git 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
index 782896e..3962ca2 100644
--- 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
+++ 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
@@ -78,6 +78,7 @@ public abstract class JStormRunnerResult implements 
PipelineResult {
 
     private final LocalCluster localCluster;
     private final long localModeExecuteTimeSecs;
+    private boolean cancelled;
 
     LocalJStormPipelineResult(
         String topologyName,
@@ -87,6 +88,7 @@ public abstract class JStormRunnerResult implements 
PipelineResult {
       super(topologyName, config);
       this.localCluster = checkNotNull(localCluster, "localCluster");
       this.localModeExecuteTimeSecs = localModeExecuteTimeSecs;
+      this.cancelled = false;
     }
 
     @Override
@@ -94,11 +96,15 @@ public abstract class JStormRunnerResult implements 
PipelineResult {
       localCluster.killTopology(getTopologyName());
       localCluster.shutdown();
       JStormUtils.sleepMs(1000);
+      cancelled = true;
       return State.CANCELLED;
     }
 
     @Override
     public State waitUntilFinish(Duration duration) {
+      if (cancelled) {
+        return State.CANCELLED;
+      }
       Sleeper sleeper = Sleeper.DEFAULT;
       BackOff backOff = 
FluentBackoff.DEFAULT.withMaxCumulativeBackoff(duration).backoff();
       try {

http://git-wip-us.apache.org/repos/asf/beam/blob/6f40506a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
index c9990e4..9d2e2f1 100644
--- 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
+++ 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
@@ -21,10 +21,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 
 import com.alibaba.jstorm.common.metric.AsmMetric;
 import com.alibaba.jstorm.metric.AsmMetricRegistry;
-import com.alibaba.jstorm.metric.AsmWindow;
 import com.alibaba.jstorm.metric.JStormMetrics;
-import com.alibaba.jstorm.metric.MetaType;
-import com.alibaba.jstorm.metric.MetricType;
 import com.alibaba.jstorm.task.error.TaskReportErrorAndDie;
 import com.alibaba.jstorm.utils.JStormUtils;
 import com.google.common.base.Optional;
@@ -34,8 +31,13 @@ import java.util.Iterator;
 import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.testing.PAssert;
+import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,31 +82,32 @@ public class TestJStormRunner extends 
PipelineRunner<JStormRunnerResult> {
 
       LOG.info("Running JStorm job {} with {} expected assertions.",
                result.getTopologyName(), numberOfAssertions);
-
-      int maxTimeoutMs =
-          numberOfAssertions > 0 ? ASSERTION_WAITING_TIME_MS : 
RESULT_WAITING_TIME_MS;
-      for (int waitTime = 0; waitTime <= maxTimeoutMs; ) {
-        Optional<Boolean> success = numberOfAssertions > 0
-                ? checkForPAssertSuccess(numberOfAssertions) : 
Optional.<Boolean>absent();
+      if (numberOfAssertions == 0) {
+        result.waitUntilFinish(Duration.millis(RESULT_WAITING_TIME_MS));
         Exception taskExceptionRec = 
TaskReportErrorAndDie.getExceptionRecord();
-        if (success.isPresent() && success.get()) {
-          return result;
-        } else if (success.isPresent() && !success.get()) {
-          throw new AssertionError("Failed assertion checks.");
-        } else if (taskExceptionRec != null) {
+        if (taskExceptionRec != null) {
           LOG.info("Exception was found.", taskExceptionRec);
           throw new RuntimeException(taskExceptionRec.getCause());
-        } else {
-          JStormUtils.sleepMs(RESULT_CHECK_INTERVAL_MS);
-          waitTime += RESULT_CHECK_INTERVAL_MS;
         }
-      }
-
-      if (numberOfAssertions > 0) {
+        return result;
+      } else {
+        for (int waitTime = 0; waitTime <= ASSERTION_WAITING_TIME_MS;) {
+          Optional<Boolean> success = checkForPAssertSuccess(result.metrics(), 
numberOfAssertions);
+          Exception taskExceptionRec = 
TaskReportErrorAndDie.getExceptionRecord();
+          if (success.isPresent() && success.get()) {
+            return result;
+          } else if (success.isPresent() && !success.get()) {
+            throw new AssertionError("Failed assertion checks.");
+          } else if (taskExceptionRec != null) {
+            LOG.info("Exception was found.", taskExceptionRec);
+            throw new RuntimeException(taskExceptionRec.getCause());
+          } else {
+            JStormUtils.sleepMs(RESULT_CHECK_INTERVAL_MS);
+            waitTime += RESULT_CHECK_INTERVAL_MS;
+          }
+        }
         LOG.info("Assertion checks timed out.");
         throw new AssertionError("Assertion checks timed out.");
-      } else {
-        return result;
       }
     } finally {
       clearPAssertCount();
@@ -113,31 +116,52 @@ public class TestJStormRunner extends 
PipelineRunner<JStormRunnerResult> {
     }
   }
 
-  private Optional<Boolean> checkForPAssertSuccess(int 
expectedNumberOfAssertions) {
-    int successes = 0;
-    for (AsmMetric metric :
-        JStormMetrics.search(PAssert.SUCCESS_COUNTER, MetaType.TASK, 
MetricType.COUNTER)) {
-      successes += ((Long) metric.getValue(AsmWindow.M1_WINDOW)).intValue();
+  private Optional<Boolean> checkForPAssertSuccess(
+      MetricResults metricResults,
+      int expectedNumberOfAssertions) {
+    Iterable<MetricResult<Long>> successCounterResults = metricResults
+        .queryMetrics(MetricsFilter.builder()
+            .addNameFilter(MetricNameFilter.named(PAssert.class, 
PAssert.SUCCESS_COUNTER))
+            .build())
+        .counters();
+
+    long successes = 0;
+    for (MetricResult<Long> counter : successCounterResults) {
+      if (counter.attempted() > 0) {
+        successes++;
+      }
     }
-    int failures = 0;
-    for (AsmMetric metric :
-        JStormMetrics.search(PAssert.FAILURE_COUNTER, MetaType.TASK, 
MetricType.COUNTER)) {
-      failures += ((Long) metric.getValue(AsmWindow.M1_WINDOW)).intValue();
+
+    Iterable<MetricResult<Long>> failureCounterResults = metricResults
+        .queryMetrics(MetricsFilter.builder()
+            .addNameFilter(MetricNameFilter.named(PAssert.class, 
PAssert.FAILURE_COUNTER))
+            .build())
+        .counters();
+
+    long failures = 0;
+    for (MetricResult<Long> counter : failureCounterResults) {
+      if (counter.attempted() > 0) {
+        failures++;
+      }
     }
 
     if (failures > 0) {
       LOG.info("Found {} success, {} failures out of {} expected assertions.",
-               successes, failures, expectedNumberOfAssertions);
+          successes, failures, expectedNumberOfAssertions);
       return Optional.of(false);
-    } else if (successes >= expectedNumberOfAssertions) {
+    } else if (successes == expectedNumberOfAssertions) {
       LOG.info("Found {} success, {} failures out of {} expected assertions.",
-               successes, failures, expectedNumberOfAssertions);
+          successes, failures, expectedNumberOfAssertions);
       return Optional.of(true);
+    } else if (successes > expectedNumberOfAssertions) {
+      LOG.info("Found {} success, {} failures out of {} expected assertions.",
+          successes, failures, expectedNumberOfAssertions);
+      return Optional.of(false);
+    } else {
+      LOG.info("Found {} success, {} failures out of {} expected assertions.",
+          successes, failures, expectedNumberOfAssertions);
+      return Optional.absent();
     }
-
-    LOG.info("Found {} success, {} failures out of {} expected assertions.",
-             successes, failures, expectedNumberOfAssertions);
-    return Optional.absent();
   }
 
   private void clearPAssertCount() {

http://git-wip-us.apache.org/repos/asf/beam/blob/6f40506a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
index 1ceaf9e..5425b6c 100644
--- 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
+++ 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
@@ -21,7 +21,6 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.alibaba.jstorm.cache.IKvStoreManager;
-import com.alibaba.jstorm.metric.MetricClient;
 import com.google.common.collect.Iterables;
 import java.io.Serializable;
 import java.util.ArrayList;

Reply via email to