Repository: beam Updated Branches: refs/heads/master 512ad1355 -> 20788953f
DataflowPipelineJob: handle concurrent cancel and finish This makes job.cancel() not throw an exception if cancel() is called while job is finished. (Note that state.isTerminal() is not guaranteed to be up to date.) Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/09580a73 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/09580a73 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/09580a73 Branch: refs/heads/master Commit: 09580a731e7b67564315a56e43658300011512eb Parents: 512ad13 Author: Dan Halperin <dhalp...@google.com> Authored: Tue Apr 4 15:33:21 2017 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Wed Apr 5 18:20:29 2017 -0700 ---------------------------------------------------------------------- .../runners/dataflow/DataflowPipelineJob.java | 14 +++++++- .../dataflow/DataflowPipelineJobTest.java | 37 +++++++++++++++----- 2 files changed, 41 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/09580a73/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index 5ad6f9f..7cb0f0e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -369,7 +369,19 @@ public class DataflowPipelineJob implements PipelineResult { } catch (IOException e) { State state = getState(); if (state.isTerminal()) { - LOG.warn("Job is already terminated. State is {}", state); + LOG.warn("Cancel failed because job is already terminated. State is {}", state); + return state; + } else if (e.getMessage().contains("has terminated")) { + // This handles the case where the getState() call above returns RUNNING but the cancel + // was rejected because the job is in fact done. Hopefully, someday we can delete this + // code if there is better consistency between the State and whether Cancel succeeds. + // + // Example message: + // Workflow modification failed. Causes: (7603adc9e9bff51e): Cannot perform + // operation 'cancel' on Job: 2017-04-01_22_50_59-9269855660514862348. Job has + // terminated in state SUCCESS: Workflow job: 2017-04-01_22_50_59-9269855660514862348 + // succeeded. + LOG.warn("Cancel failed because job is already terminated.", e); return state; } else { String errorMsg = String.format( http://git-wip-us.apache.org/repos/asf/beam/blob/09580a73/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java index 108badd..e3d2e4e 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java @@ -618,7 +618,6 @@ public class DataflowPipelineJobTest { thrown.expect(IllegalArgumentException.class); thrown.expectMessage("not used in this pipeline"); - job.getAggregatorValues(aggregator); } @@ -656,7 +655,6 @@ public class DataflowPipelineJobTest { thrown.expectCause(is(cause)); thrown.expectMessage(aggregator.toString()); thrown.expectMessage("when retrieving Aggregator values for"); - job.getAggregatorValues(aggregator); } @@ -750,7 +748,7 @@ public class DataflowPipelineJobTest { Dataflow.Projects.Locations.Jobs.Update.class); when(mockJobs.update(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID), any(Job.class))) .thenReturn(update); - when(update.execute()).thenThrow(new IOException()); + when(update.execute()).thenThrow(new IOException("Some random IOException")); DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null); @@ -758,13 +756,34 @@ public class DataflowPipelineJobTest { thrown.expectMessage("Failed to cancel job in state RUNNING, " + "please go to the Developers Console to cancel it manually:"); job.cancel(); + } - Job content = new Job(); - content.setProjectId(PROJECT_ID); - content.setId(JOB_ID); - content.setRequestedState("JOB_STATE_CANCELLED"); - verify(mockJobs).update(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID), eq(content)); - verify(mockJobs).get(PROJECT_ID, REGION_ID, JOB_ID); + /** + * Test that {@link DataflowPipelineJob#cancel} doesn't throw if the Dataflow service returns + * non-terminal state even though the cancel API call failed, which can happen in practice. + * + * <p>TODO: delete this code if the API calls become consistent. + */ + @Test + public void testCancelTerminatedJobWithStaleState() throws IOException { + Dataflow.Projects.Locations.Jobs.Get statusRequest = + mock(Dataflow.Projects.Locations.Jobs.Get.class); + + Job statusResponse = new Job(); + statusResponse.setCurrentState("JOB_STATE_RUNNING"); + when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(statusRequest); + when(statusRequest.execute()).thenReturn(statusResponse); + + Dataflow.Projects.Locations.Jobs.Update update = mock( + Dataflow.Projects.Locations.Jobs.Update.class); + when(mockJobs.update(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID), any(Job.class))) + .thenReturn(update); + when(update.execute()).thenThrow(new IOException("Job has terminated in state SUCCESS")); + + DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null); + State returned = job.cancel(); + assertThat(returned, equalTo(State.RUNNING)); + expectedLogs.verifyWarn("Cancel failed because job is already terminated."); } @Test