This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ab69f9  Fix IllegalArgumentException in Interval
     new 8d5b7c7  Merge pull request #13993 from baeminbo/patch-1
6ab69f9 is described below

commit 6ab69f9e6cd6f876af325fbc13cc329fb42a068c
Author: Minbo Bae <49642083+baemi...@users.noreply.github.com>
AuthorDate: Sun Feb 14 21:36:40 2021 -0800

    Fix IllegalArgumentException in Interval
    
    If a time correction happens, `endTime` can be before `startTime, which 
cause `IllegalArgumentException` (the end instant must be greater than the 
start instant).
    
    Compute the elapsed time by `endTime.getMillis() - startTime.getMills()` 
which may be a negative value but I think it's ok because it's only logging 
purpose.
---
 .../beam/runners/dataflow/worker/DataflowWorkUnitClient.java      | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
index 618a451..1590225 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
@@ -48,7 +48,6 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditio
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
-import org.joda.time.Interval;
 import org.slf4j.Logger;
 
 /** A Dataflow WorkUnit client that fetches WorkItems from the Dataflow 
service. */
@@ -215,14 +214,15 @@ class DataflowWorkUnitClient implements WorkUnitClient {
         && DataflowWorkerLoggingMDC.getStageName() != null) {
       DateTime startTime = stageStartTime.get();
       if (startTime != null) {
-        // This thread should have been tagged with the stage start time 
during getWorkItem(),
-        Interval elapsed = new Interval(startTime, endTime);
+        // elapsed time can be negative by time correction
+        long elapsed = endTime.getMillis() - startTime.getMillis();
         int numErrors = workItemStatus.getErrors() == null ? 0 : 
workItemStatus.getErrors().size();
+        // This thread should have been tagged with the stage start time 
during getWorkItem(),
         logger.info(
             "Finished processing stage {} with {} errors in {} seconds ",
             DataflowWorkerLoggingMDC.getStageName(),
             numErrors,
-            (double) elapsed.toDurationMillis() / 1000);
+            (double) elapsed / 1000);
       }
     }
     shortIdCache.shortenIdsIfAvailable(workItemStatus.getCounterUpdates());

Reply via email to