Repository: beam Updated Branches: refs/heads/gearpump-runner 4c445dd0b -> 1ed16f11a
[BEAM-1180] Implement GearpumpPipelineResult Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/21554764 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/21554764 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/21554764 Branch: refs/heads/gearpump-runner Commit: 21554764056c45ea18be1e844b4ca1bfb71e544a Parents: 4c445dd Author: manuzhang <owenzhang1...@gmail.com> Authored: Tue Dec 20 10:39:56 2016 +0800 Committer: manuzhang <owenzhang1...@gmail.com> Committed: Wed Jan 4 12:59:08 2017 +0800 ---------------------------------------------------------------------- runners/gearpump/pom.xml | 1 + .../gearpump/GearpumpPipelineResult.java | 59 ++++++++++++++++++-- .../beam/runners/gearpump/GearpumpRunner.java | 4 +- .../runners/gearpump/TestGearpumpRunner.java | 4 ++ .../translators/GroupByKeyTranslator.java | 1 - 5 files changed, 62 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/21554764/runners/gearpump/pom.xml ---------------------------------------------------------------------- diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml index bb35ad7..777ad34 100644 --- a/runners/gearpump/pom.xml +++ b/runners/gearpump/pom.xml @@ -69,6 +69,7 @@ <dependenciesToScan> <dependency>org.apache.beam:beam-sdks-java-core</dependency> </dependenciesToScan> + <argLine>-noverify</argLine> <excludes> <!-- side input is not supported in Gearpump --> <exclude> http://git-wip-us.apache.org/repos/asf/beam/blob/21554764/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java index ed1201d..9c8f7b3 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.gearpump; import java.io.IOException; +import java.util.List; import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.AggregatorValues; @@ -26,31 +27,62 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.gearpump.cluster.MasterToAppMaster; +import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterData; +import org.apache.gearpump.cluster.client.ClientContext; import org.joda.time.Duration; +import scala.collection.JavaConverters; +import scala.collection.Seq; /** * Result of executing a {@link Pipeline} with Gearpump. */ public class GearpumpPipelineResult implements PipelineResult { + + private final ClientContext client; + private final int appId; + private final Duration defaultWaitDuration = Duration.standardSeconds(60); + private final Duration defaultWaitInterval = Duration.standardSeconds(10); + + public GearpumpPipelineResult(ClientContext client, int appId) { + this.client = client; + this.appId = appId; + } + @Override public State getState() { - return null; + return getGearpumpState(); } @Override public State cancel() throws IOException { - return null; + client.shutdown(appId); + return State.CANCELLED; } @Override public State waitUntilFinish(Duration duration) { - return null; + long start = System.currentTimeMillis(); + do { + try { + Thread.sleep(defaultWaitInterval.getMillis()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } while (State.RUNNING == getGearpumpState() + && (System.currentTimeMillis() - start) < duration.getMillis()); + + if (State.RUNNING == getGearpumpState()) { + return State.DONE; + } else { + return State.FAILED; + } } @Override public State waitUntilFinish() { - return null; + return waitUntilFinish(defaultWaitDuration); } @Override @@ -66,4 +98,23 @@ public class GearpumpPipelineResult implements PipelineResult { return null; } + private State getGearpumpState() { + String status = null; + List<AppMasterData> apps = + JavaConverters.<AppMasterData>seqAsJavaListConverter( + (Seq<AppMasterData>) client.listApps().appMasters()).asJava(); + for (AppMasterData app: apps) { + if (app.appId() == appId) { + status = app.status(); + } + } + if (null == status || status.equals(MasterToAppMaster.AppMasterNonExist())) { + return State.UNKNOWN; + } else if (status.equals(MasterToAppMaster.AppMasterActive())) { + return State.RUNNING; + } else { + return State.STOPPED; + } + } + } http://git-wip-us.apache.org/repos/asf/beam/blob/21554764/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java index 4083922..9c44da3 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java @@ -107,9 +107,9 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> { TranslationContext translationContext = new TranslationContext(streamApp, options); GearpumpPipelineTranslator translator = new GearpumpPipelineTranslator(translationContext); translator.translate(pipeline); - streamApp.submit(); + int appId = streamApp.submit(); - return null; + return new GearpumpPipelineResult(clientContext, appId); } private ClientContext getClientContext(GearpumpPipelineOptions options, Config config) { http://git-wip-us.apache.org/repos/asf/beam/blob/21554764/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java index 89d31a6..ee31fb5 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.gearpump; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.runners.PipelineRunner; @@ -52,7 +53,10 @@ public class TestGearpumpRunner extends PipelineRunner<GearpumpPipelineResult> { @Override public GearpumpPipelineResult run(Pipeline pipeline) { GearpumpPipelineResult result = delegate.run(pipeline); + PipelineResult.State state = result.waitUntilFinish(); cluster.stop(); + assert(state == PipelineResult.State.DONE); + return result; } http://git-wip-us.apache.org/repos/asf/beam/blob/21554764/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java index d64f1bf..989957f 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java @@ -134,7 +134,6 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe private static class ValueToIterable<K, V> implements MapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<K, Iterable<V>>>> { - @Override public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, V>> wv) { Iterable<V> values = Lists.newArrayList(wv.getValue().getValue());