This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d11bca  [BEAM-7668] Add ability to query a pipeline definition from a 
gRPC JobService
     new 05dba6a  Merge PR #8977
7d11bca is described below

commit 7d11bca6aa5039bcd8ea7badb1319b6dcca9fb8e
Author: Chad Dombrova <chad...@gmail.com>
AuthorDate: Mon Jul 1 16:16:59 2019 -0400

    [BEAM-7668] Add ability to query a pipeline definition from a gRPC 
JobService
---
 .../src/main/proto/beam_job_api.proto              | 16 +++++
 .../jobsubmission/InMemoryJobService.java          | 23 +++++++
 .../fnexecution/jobsubmission/JobInvocation.java   |  5 ++
 .../jobsubmission/InMemoryJobServiceTest.java      | 70 ++++++++++++++++++----
 .../runners/portability/local_job_service.py       |  4 ++
 5 files changed, 108 insertions(+), 10 deletions(-)

diff --git a/model/job-management/src/main/proto/beam_job_api.proto 
b/model/job-management/src/main/proto/beam_job_api.proto
index c7b972e..1226581 100644
--- a/model/job-management/src/main/proto/beam_job_api.proto
+++ b/model/job-management/src/main/proto/beam_job_api.proto
@@ -46,6 +46,9 @@ service JobService {
   // Get the current state of the job
   rpc GetState (GetJobStateRequest) returns (GetJobStateResponse);
 
+  // Get the job's pipeline
+  rpc GetPipeline (GetJobPipelineRequest) returns (GetJobPipelineResponse);
+
   // Cancel the job
   rpc Cancel (CancelJobRequest) returns (CancelJobResponse);
 
@@ -134,6 +137,19 @@ message GetJobStateResponse {
 }
 
 
+// GetPipeline is a synchronus request that returns a pipeline back
+// Throws error GRPC_STATUS_UNAVAILABLE if server is down
+// Throws error NOT_FOUND if the jobId is not found
+message GetJobPipelineRequest {
+  string job_id = 1; // (required)
+
+}
+
+message GetJobPipelineResponse {
+  org.apache.beam.model.pipeline.v1.Pipeline pipeline = 1; // (required)
+}
+
+
 // GetJobMessages is a streaming api for streaming job messages from the 
service
 // One request will connect you to the job and you'll get a stream of job state
 // and job messages back; one is used for logging and the other for detecting
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
index 642d3bd..5a3f228 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
@@ -26,6 +26,8 @@ import 
org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
 import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
 import 
org.apache.beam.model.jobmanagement.v1.JobApi.DescribePipelineOptionsRequest;
 import 
org.apache.beam.model.jobmanagement.v1.JobApi.DescribePipelineOptionsResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobPipelineRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobPipelineResponse;
 import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
 import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
 import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
@@ -38,6 +40,7 @@ import 
org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
 import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
 import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.graph.PipelineValidator;
 import org.apache.beam.runners.fnexecution.FnService;
 import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
@@ -235,6 +238,26 @@ public class InMemoryJobService extends 
JobServiceGrpc.JobServiceImplBase implem
   }
 
   @Override
+  public void getPipeline(
+      GetJobPipelineRequest request, StreamObserver<GetJobPipelineResponse> 
responseObserver) {
+    LOG.trace("{} {}", GetJobPipelineRequest.class.getSimpleName(), request);
+    String invocationId = request.getJobId();
+    try {
+      JobInvocation invocation = getInvocation(invocationId);
+      RunnerApi.Pipeline pipeline = invocation.getPipeline();
+      GetJobPipelineResponse response =
+          GetJobPipelineResponse.newBuilder().setPipeline(pipeline).build();
+      responseObserver.onNext(response);
+      responseObserver.onCompleted();
+    } catch (Exception e) {
+      String errMessage =
+          String.format("Encountered Unexpected Exception for Invocation %s", 
invocationId);
+      LOG.error(errMessage, e);
+      responseObserver.onError(Status.INTERNAL.withCause(e).asException());
+    }
+  }
+
+  @Override
   public void cancel(CancelJobRequest request, 
StreamObserver<CancelJobResponse> responseObserver) {
     LOG.trace("{} {}", CancelJobRequest.class.getSimpleName(), request);
     String invocationId = request.getJobId();
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
index ab4fda7..819a007 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
@@ -175,6 +175,11 @@ public class JobInvocation {
     return this.jobState;
   }
 
+  /** Retrieve the job's pipeline. */
+  public RunnerApi.Pipeline getPipeline() {
+    return this.pipeline;
+  }
+
   /** Listen for job state changes with a {@link Consumer}. */
   public synchronized void addStateListener(Consumer<JobState.Enum> 
stateStreamObserver) {
     stateStreamObserver.accept(getState());
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java
index 961016a..01346af 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.fnexecution.jobsubmission;
 
 import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
 import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.Is.isA;
 import static org.hamcrest.core.IsNull.notNullValue;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.any;
@@ -31,6 +32,7 @@ import org.apache.beam.model.jobmanagement.v1.JobApi;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct;
+import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusException;
 import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
 import org.junit.Before;
 import org.junit.Test;
@@ -62,6 +64,35 @@ public class InMemoryJobServiceTest {
         InMemoryJobService.create(stagingServiceDescriptor, session -> 
"token", null, invoker);
     when(invoker.invoke(TEST_PIPELINE, TEST_OPTIONS, 
TEST_RETRIEVAL_TOKEN)).thenReturn(invocation);
     when(invocation.getId()).thenReturn(TEST_JOB_ID);
+    when(invocation.getPipeline()).thenReturn(TEST_PIPELINE);
+  }
+
+  private JobApi.PrepareJobResponse prepareJob() {
+    JobApi.PrepareJobRequest request =
+        JobApi.PrepareJobRequest.newBuilder()
+            .setJobName(TEST_JOB_NAME)
+            .setPipeline(RunnerApi.Pipeline.getDefaultInstance())
+            .setPipelineOptions(Struct.getDefaultInstance())
+            .build();
+    RecordingObserver<JobApi.PrepareJobResponse> recorder = new 
RecordingObserver<>();
+    service.prepare(request, recorder);
+    return recorder.values.get(0);
+  }
+
+  private JobApi.RunJobResponse runJob(String preparationId) {
+    JobApi.RunJobRequest runRequest =
+        JobApi.RunJobRequest.newBuilder()
+            .setPreparationId(preparationId)
+            .setRetrievalToken(TEST_RETRIEVAL_TOKEN)
+            .build();
+    RecordingObserver<JobApi.RunJobResponse> recorder = new 
RecordingObserver<>();
+    service.run(runRequest, recorder);
+    return recorder.values.get(0);
+  }
+
+  private JobApi.RunJobResponse prepareAndRunJob() {
+    JobApi.PrepareJobResponse prepareResponse = prepareJob();
+    return runJob(prepareResponse.getPreparationId());
   }
 
   @Test
@@ -82,17 +113,36 @@ public class InMemoryJobServiceTest {
   }
 
   @Test
+  public void testGetPipelineFailure() {
+    prepareJob();
+
+    JobApi.GetJobPipelineRequest request =
+        
JobApi.GetJobPipelineRequest.newBuilder().setJobId(TEST_JOB_ID).build();
+    RecordingObserver<JobApi.GetJobPipelineResponse> recorder = new 
RecordingObserver<>();
+    service.getPipeline(request, recorder);
+    // job has not been run yet
+    assertThat(recorder.isSuccessful(), is(false));
+    assertThat(recorder.error, isA(StatusException.class));
+  }
+
+  @Test
+  public void testGetPipelineIsSuccessful() throws Exception {
+    prepareAndRunJob();
+
+    JobApi.GetJobPipelineRequest request =
+        
JobApi.GetJobPipelineRequest.newBuilder().setJobId(TEST_JOB_ID).build();
+    RecordingObserver<JobApi.GetJobPipelineResponse> recorder = new 
RecordingObserver<>();
+    service.getPipeline(request, recorder);
+    assertThat(recorder.isSuccessful(), is(true));
+    assertThat(recorder.values, hasSize(1));
+    JobApi.GetJobPipelineResponse response = recorder.values.get(0);
+    assertThat(response.getPipeline(), is(TEST_PIPELINE));
+  }
+
+  @Test
   public void testJobSubmissionUsesJobInvokerAndIsSuccess() throws Exception {
-    // prepare job
-    JobApi.PrepareJobRequest prepareRequest =
-        JobApi.PrepareJobRequest.newBuilder()
-            .setJobName(TEST_JOB_NAME)
-            .setPipeline(RunnerApi.Pipeline.getDefaultInstance())
-            .setPipelineOptions(Struct.getDefaultInstance())
-            .build();
-    RecordingObserver<JobApi.PrepareJobResponse> prepareRecorder = new 
RecordingObserver<>();
-    service.prepare(prepareRequest, prepareRecorder);
-    JobApi.PrepareJobResponse prepareResponse = prepareRecorder.values.get(0);
+    JobApi.PrepareJobResponse prepareResponse = prepareJob();
+
     // run job
     JobApi.RunJobRequest runRequest =
         JobApi.RunJobRequest.newBuilder()
diff --git a/sdks/python/apache_beam/runners/portability/local_job_service.py 
b/sdks/python/apache_beam/runners/portability/local_job_service.py
index 91ceff7..48193ee 100644
--- a/sdks/python/apache_beam/runners/portability/local_job_service.py
+++ b/sdks/python/apache_beam/runners/portability/local_job_service.py
@@ -131,6 +131,10 @@ class 
LocalJobServicer(beam_job_api_pb2_grpc.JobServiceServicer):
     return beam_job_api_pb2.GetJobStateResponse(
         state=self._jobs[request.job_id].state)
 
+  def GetPipeline(self, request, context=None):
+    return beam_job_api_pb2.GetJobPipelineResponse(
+        pipeline=self._jobs[request.job_id]._pipeline_proto)
+
   def Cancel(self, request, context=None):
     self._jobs[request.job_id].cancel()
     return beam_job_api_pb2.CancelJobRequest(

Reply via email to