This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 7d11bca [BEAM-7668] Add ability to query a pipeline definition from a gRPC JobService new 05dba6a Merge PR #8977 7d11bca is described below commit 7d11bca6aa5039bcd8ea7badb1319b6dcca9fb8e Author: Chad Dombrova <chad...@gmail.com> AuthorDate: Mon Jul 1 16:16:59 2019 -0400 [BEAM-7668] Add ability to query a pipeline definition from a gRPC JobService --- .../src/main/proto/beam_job_api.proto | 16 +++++ .../jobsubmission/InMemoryJobService.java | 23 +++++++ .../fnexecution/jobsubmission/JobInvocation.java | 5 ++ .../jobsubmission/InMemoryJobServiceTest.java | 70 ++++++++++++++++++---- .../runners/portability/local_job_service.py | 4 ++ 5 files changed, 108 insertions(+), 10 deletions(-) diff --git a/model/job-management/src/main/proto/beam_job_api.proto b/model/job-management/src/main/proto/beam_job_api.proto index c7b972e..1226581 100644 --- a/model/job-management/src/main/proto/beam_job_api.proto +++ b/model/job-management/src/main/proto/beam_job_api.proto @@ -46,6 +46,9 @@ service JobService { // Get the current state of the job rpc GetState (GetJobStateRequest) returns (GetJobStateResponse); + // Get the job's pipeline + rpc GetPipeline (GetJobPipelineRequest) returns (GetJobPipelineResponse); + // Cancel the job rpc Cancel (CancelJobRequest) returns (CancelJobResponse); @@ -134,6 +137,19 @@ message GetJobStateResponse { } +// GetPipeline is a synchronus request that returns a pipeline back +// Throws error GRPC_STATUS_UNAVAILABLE if server is down +// Throws error NOT_FOUND if the jobId is not found +message GetJobPipelineRequest { + string job_id = 1; // (required) + +} + +message GetJobPipelineResponse { + org.apache.beam.model.pipeline.v1.Pipeline pipeline = 1; // (required) +} + + // GetJobMessages is a streaming api for streaming job messages from the service // One request will connect you to the job and you'll get a stream of job state // and job messages back; one is used for logging and the other for detecting diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java index 642d3bd..5a3f228 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java @@ -26,6 +26,8 @@ import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; import org.apache.beam.model.jobmanagement.v1.JobApi.DescribePipelineOptionsRequest; import org.apache.beam.model.jobmanagement.v1.JobApi.DescribePipelineOptionsResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobPipelineRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobPipelineResponse; import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; @@ -38,6 +40,7 @@ import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.graph.PipelineValidator; import org.apache.beam.runners.fnexecution.FnService; import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver; @@ -235,6 +238,26 @@ public class InMemoryJobService extends JobServiceGrpc.JobServiceImplBase implem } @Override + public void getPipeline( + GetJobPipelineRequest request, StreamObserver<GetJobPipelineResponse> responseObserver) { + LOG.trace("{} {}", GetJobPipelineRequest.class.getSimpleName(), request); + String invocationId = request.getJobId(); + try { + JobInvocation invocation = getInvocation(invocationId); + RunnerApi.Pipeline pipeline = invocation.getPipeline(); + GetJobPipelineResponse response = + GetJobPipelineResponse.newBuilder().setPipeline(pipeline).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (Exception e) { + String errMessage = + String.format("Encountered Unexpected Exception for Invocation %s", invocationId); + LOG.error(errMessage, e); + responseObserver.onError(Status.INTERNAL.withCause(e).asException()); + } + } + + @Override public void cancel(CancelJobRequest request, StreamObserver<CancelJobResponse> responseObserver) { LOG.trace("{} {}", CancelJobRequest.class.getSimpleName(), request); String invocationId = request.getJobId(); diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java index ab4fda7..819a007 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java @@ -175,6 +175,11 @@ public class JobInvocation { return this.jobState; } + /** Retrieve the job's pipeline. */ + public RunnerApi.Pipeline getPipeline() { + return this.pipeline; + } + /** Listen for job state changes with a {@link Consumer}. */ public synchronized void addStateListener(Consumer<JobState.Enum> stateStreamObserver) { stateStreamObserver.accept(getState()); diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java index 961016a..01346af 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.fnexecution.jobsubmission; import static org.hamcrest.collection.IsCollectionWithSize.hasSize; import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.Is.isA; import static org.hamcrest.core.IsNull.notNullValue; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; @@ -31,6 +32,7 @@ import org.apache.beam.model.jobmanagement.v1.JobApi; import org.apache.beam.model.pipeline.v1.Endpoints; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct; +import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusException; import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver; import org.junit.Before; import org.junit.Test; @@ -62,6 +64,35 @@ public class InMemoryJobServiceTest { InMemoryJobService.create(stagingServiceDescriptor, session -> "token", null, invoker); when(invoker.invoke(TEST_PIPELINE, TEST_OPTIONS, TEST_RETRIEVAL_TOKEN)).thenReturn(invocation); when(invocation.getId()).thenReturn(TEST_JOB_ID); + when(invocation.getPipeline()).thenReturn(TEST_PIPELINE); + } + + private JobApi.PrepareJobResponse prepareJob() { + JobApi.PrepareJobRequest request = + JobApi.PrepareJobRequest.newBuilder() + .setJobName(TEST_JOB_NAME) + .setPipeline(RunnerApi.Pipeline.getDefaultInstance()) + .setPipelineOptions(Struct.getDefaultInstance()) + .build(); + RecordingObserver<JobApi.PrepareJobResponse> recorder = new RecordingObserver<>(); + service.prepare(request, recorder); + return recorder.values.get(0); + } + + private JobApi.RunJobResponse runJob(String preparationId) { + JobApi.RunJobRequest runRequest = + JobApi.RunJobRequest.newBuilder() + .setPreparationId(preparationId) + .setRetrievalToken(TEST_RETRIEVAL_TOKEN) + .build(); + RecordingObserver<JobApi.RunJobResponse> recorder = new RecordingObserver<>(); + service.run(runRequest, recorder); + return recorder.values.get(0); + } + + private JobApi.RunJobResponse prepareAndRunJob() { + JobApi.PrepareJobResponse prepareResponse = prepareJob(); + return runJob(prepareResponse.getPreparationId()); } @Test @@ -82,17 +113,36 @@ public class InMemoryJobServiceTest { } @Test + public void testGetPipelineFailure() { + prepareJob(); + + JobApi.GetJobPipelineRequest request = + JobApi.GetJobPipelineRequest.newBuilder().setJobId(TEST_JOB_ID).build(); + RecordingObserver<JobApi.GetJobPipelineResponse> recorder = new RecordingObserver<>(); + service.getPipeline(request, recorder); + // job has not been run yet + assertThat(recorder.isSuccessful(), is(false)); + assertThat(recorder.error, isA(StatusException.class)); + } + + @Test + public void testGetPipelineIsSuccessful() throws Exception { + prepareAndRunJob(); + + JobApi.GetJobPipelineRequest request = + JobApi.GetJobPipelineRequest.newBuilder().setJobId(TEST_JOB_ID).build(); + RecordingObserver<JobApi.GetJobPipelineResponse> recorder = new RecordingObserver<>(); + service.getPipeline(request, recorder); + assertThat(recorder.isSuccessful(), is(true)); + assertThat(recorder.values, hasSize(1)); + JobApi.GetJobPipelineResponse response = recorder.values.get(0); + assertThat(response.getPipeline(), is(TEST_PIPELINE)); + } + + @Test public void testJobSubmissionUsesJobInvokerAndIsSuccess() throws Exception { - // prepare job - JobApi.PrepareJobRequest prepareRequest = - JobApi.PrepareJobRequest.newBuilder() - .setJobName(TEST_JOB_NAME) - .setPipeline(RunnerApi.Pipeline.getDefaultInstance()) - .setPipelineOptions(Struct.getDefaultInstance()) - .build(); - RecordingObserver<JobApi.PrepareJobResponse> prepareRecorder = new RecordingObserver<>(); - service.prepare(prepareRequest, prepareRecorder); - JobApi.PrepareJobResponse prepareResponse = prepareRecorder.values.get(0); + JobApi.PrepareJobResponse prepareResponse = prepareJob(); + // run job JobApi.RunJobRequest runRequest = JobApi.RunJobRequest.newBuilder() diff --git a/sdks/python/apache_beam/runners/portability/local_job_service.py b/sdks/python/apache_beam/runners/portability/local_job_service.py index 91ceff7..48193ee 100644 --- a/sdks/python/apache_beam/runners/portability/local_job_service.py +++ b/sdks/python/apache_beam/runners/portability/local_job_service.py @@ -131,6 +131,10 @@ class LocalJobServicer(beam_job_api_pb2_grpc.JobServiceServicer): return beam_job_api_pb2.GetJobStateResponse( state=self._jobs[request.job_id].state) + def GetPipeline(self, request, context=None): + return beam_job_api_pb2.GetJobPipelineResponse( + pipeline=self._jobs[request.job_id]._pipeline_proto) + def Cancel(self, request, context=None): self._jobs[request.job_id].cancel() return beam_job_api_pb2.CancelJobRequest(