Repository: beam
Updated Branches:
  refs/heads/master 512ad1355 -> 20788953f


DataflowPipelineJob: handle concurrent cancel and finish

This makes job.cancel() not throw an exception if cancel() is called while job 
is finished.
(Note that state.isTerminal() is not guaranteed to be up to date.)


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/09580a73
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/09580a73
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/09580a73

Branch: refs/heads/master
Commit: 09580a731e7b67564315a56e43658300011512eb
Parents: 512ad13
Author: Dan Halperin <dhalp...@google.com>
Authored: Tue Apr 4 15:33:21 2017 -0700
Committer: Dan Halperin <dhalp...@google.com>
Committed: Wed Apr 5 18:20:29 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/DataflowPipelineJob.java   | 14 +++++++-
 .../dataflow/DataflowPipelineJobTest.java       | 37 +++++++++++++++-----
 2 files changed, 41 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/09580a73/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 5ad6f9f..7cb0f0e 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -369,7 +369,19 @@ public class DataflowPipelineJob implements PipelineResult 
{
         } catch (IOException e) {
           State state = getState();
           if (state.isTerminal()) {
-            LOG.warn("Job is already terminated. State is {}", state);
+            LOG.warn("Cancel failed because job is already terminated. State 
is {}", state);
+            return state;
+          } else if (e.getMessage().contains("has terminated")) {
+            // This handles the case where the getState() call above returns 
RUNNING but the cancel
+            // was rejected because the job is in fact done. Hopefully, 
someday we can delete this
+            // code if there is better consistency between the State and 
whether Cancel succeeds.
+            //
+            // Example message:
+            //    Workflow modification failed. Causes: (7603adc9e9bff51e): 
Cannot perform
+            //    operation 'cancel' on Job: 
2017-04-01_22_50_59-9269855660514862348. Job has
+            //    terminated in state SUCCESS: Workflow job: 
2017-04-01_22_50_59-9269855660514862348
+            //    succeeded.
+            LOG.warn("Cancel failed because job is already terminated.", e);
             return state;
           } else {
             String errorMsg = String.format(

http://git-wip-us.apache.org/repos/asf/beam/blob/09580a73/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 108badd..e3d2e4e 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -618,7 +618,6 @@ public class DataflowPipelineJobTest {
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("not used in this pipeline");
-
     job.getAggregatorValues(aggregator);
   }
 
@@ -656,7 +655,6 @@ public class DataflowPipelineJobTest {
     thrown.expectCause(is(cause));
     thrown.expectMessage(aggregator.toString());
     thrown.expectMessage("when retrieving Aggregator values for");
-
     job.getAggregatorValues(aggregator);
   }
 
@@ -750,7 +748,7 @@ public class DataflowPipelineJobTest {
         Dataflow.Projects.Locations.Jobs.Update.class);
     when(mockJobs.update(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID), 
any(Job.class)))
         .thenReturn(update);
-    when(update.execute()).thenThrow(new IOException());
+    when(update.execute()).thenThrow(new IOException("Some random 
IOException"));
 
     DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
 
@@ -758,13 +756,34 @@ public class DataflowPipelineJobTest {
     thrown.expectMessage("Failed to cancel job in state RUNNING, "
         + "please go to the Developers Console to cancel it manually:");
     job.cancel();
+  }
 
-    Job content = new Job();
-    content.setProjectId(PROJECT_ID);
-    content.setId(JOB_ID);
-    content.setRequestedState("JOB_STATE_CANCELLED");
-    verify(mockJobs).update(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID), 
eq(content));
-    verify(mockJobs).get(PROJECT_ID, REGION_ID, JOB_ID);
+  /**
+   * Test that {@link DataflowPipelineJob#cancel} doesn't throw if the 
Dataflow service returns
+   * non-terminal state even though the cancel API call failed, which can 
happen in practice.
+   *
+   * <p>TODO: delete this code if the API calls become consistent.
+   */
+  @Test
+  public void testCancelTerminatedJobWithStaleState() throws IOException {
+    Dataflow.Projects.Locations.Jobs.Get statusRequest =
+        mock(Dataflow.Projects.Locations.Jobs.Get.class);
+
+    Job statusResponse = new Job();
+    statusResponse.setCurrentState("JOB_STATE_RUNNING");
+    when(mockJobs.get(PROJECT_ID, REGION_ID, 
JOB_ID)).thenReturn(statusRequest);
+    when(statusRequest.execute()).thenReturn(statusResponse);
+
+    Dataflow.Projects.Locations.Jobs.Update update = mock(
+        Dataflow.Projects.Locations.Jobs.Update.class);
+    when(mockJobs.update(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID), 
any(Job.class)))
+        .thenReturn(update);
+    when(update.execute()).thenThrow(new IOException("Job has terminated in 
state SUCCESS"));
+
+    DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
+    State returned = job.cancel();
+    assertThat(returned, equalTo(State.RUNNING));
+    expectedLogs.verifyWarn("Cancel failed because job is already 
terminated.");
   }
 
   @Test

Reply via email to