[ 
https://issues.apache.org/jira/browse/BEAM-3798?focusedWorklogId=81021&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81021
 ]

ASF GitHub Bot logged work on BEAM-3798:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Mar/18 23:26
            Start Date: 15/Mar/18 23:26
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #4871: [BEAM-3798] Remove 
error check on dataflow when getting batch job state
URL: https://github.com/apache/beam/pull/4871
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
index e163fe8d674..8679a952284 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
@@ -181,7 +181,7 @@ private boolean waitForStreamingJobTermination(
   }
 
   /**
-   * Return {@code true} if the job succeeded or {@code false} if it 
terminated in any other manner.
+   * Return {@code true} if job state is {@code State.DONE}. {@code false} 
otherwise.
    */
   private boolean waitForBatchJobTermination(
       DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) {
@@ -195,7 +195,7 @@ private boolean waitForBatchJobTermination(
         return false;
       }
 
-      return job.getState() == State.DONE && !messageHandler.hasSeenError();
+      return job.getState() == State.DONE;
     }
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
index f382e4b6ed2..cf54556a093 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
@@ -121,6 +121,40 @@ public void testRunBatchJobThatSucceeds() throws Exception 
{
     assertEquals(mockJob, runner.run(p, mockRunner));
   }
 
+  /**
+   * Job success on Dataflow means that it handled transient errors (if any) 
successfully
+   * by retrying failed bundles.
+   */
+  @Test
+  public void testRunBatchJobThatSucceedsDespiteTransientErrors() throws 
Exception {
+    Pipeline p = Pipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+    when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))
+      .thenAnswer(
+        invocation -> {
+          JobMessage message = new JobMessage();
+          message.setMessageText("TransientError");
+          message.setTime(TimeUtil.toCloudTime(Instant.now()));
+          message.setMessageImportance("JOB_MESSAGE_ERROR");
+          ((JobMessagesHandler) 
invocation.getArguments()[1]).process(Arrays.asList(message));
+          return State.DONE;
+        });
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    when(mockClient.getJobMetrics(anyString()))
+      .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */));
+    assertEquals(mockJob, runner.run(p, mockRunner));
+  }
+
   /**
    * Tests that when a batch job terminates in a failure state even if all 
assertions
    * passed, it throws an error to that effect.


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 81021)
    Time Spent: 40m  (was: 0.5h)

> Performance tests flaky due to Dataflow transient errors
> --------------------------------------------------------
>
>                 Key: BEAM-3798
>                 URL: https://issues.apache.org/jira/browse/BEAM-3798
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>            Reporter: Łukasz Gajowy
>            Assignee: Łukasz Gajowy
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> Performance tests are flaky due to transient errors that happened during data 
> processing (eg. SocketTimeoutException while connecting to DB). Currently 
> exceptions that happen on Dataflow runner but are retried successfully, fail 
> the test regardless of the final job state (giving a false-negative result). 
> Possible solution for batch scenarios:
> We could "rethrow" exceptions that happened due to transient errors *only* if 
> the job status is other than DONE.
> Possible solution for streaming scenarios:
> (don't know yet)
> [Link to discussion on dev list 
> |https://lists.apache.org/thread.html/e480f8181913dc81d2d4cd1430557a646537473ccf29fe6390229098@%3Cdev.beam.apache.org%3E]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to