This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 6ab69f9 Fix IllegalArgumentException in Interval new 8d5b7c7 Merge pull request #13993 from baeminbo/patch-1 6ab69f9 is described below commit 6ab69f9e6cd6f876af325fbc13cc329fb42a068c Author: Minbo Bae <49642083+baemi...@users.noreply.github.com> AuthorDate: Sun Feb 14 21:36:40 2021 -0800 Fix IllegalArgumentException in Interval If a time correction happens, `endTime` can be before `startTime, which cause `IllegalArgumentException` (the end instant must be greater than the start instant). Compute the elapsed time by `endTime.getMillis() - startTime.getMills()` which may be a negative value but I think it's ok because it's only logging purpose. --- .../beam/runners/dataflow/worker/DataflowWorkUnitClient.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java index 618a451..1590225 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java @@ -48,7 +48,6 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditio import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.joda.time.DateTime; import org.joda.time.Duration; -import org.joda.time.Interval; import org.slf4j.Logger; /** A Dataflow WorkUnit client that fetches WorkItems from the Dataflow service. */ @@ -215,14 +214,15 @@ class DataflowWorkUnitClient implements WorkUnitClient { && DataflowWorkerLoggingMDC.getStageName() != null) { DateTime startTime = stageStartTime.get(); if (startTime != null) { - // This thread should have been tagged with the stage start time during getWorkItem(), - Interval elapsed = new Interval(startTime, endTime); + // elapsed time can be negative by time correction + long elapsed = endTime.getMillis() - startTime.getMillis(); int numErrors = workItemStatus.getErrors() == null ? 0 : workItemStatus.getErrors().size(); + // This thread should have been tagged with the stage start time during getWorkItem(), logger.info( "Finished processing stage {} with {} errors in {} seconds ", DataflowWorkerLoggingMDC.getStageName(), numErrors, - (double) elapsed.toDurationMillis() / 1000); + (double) elapsed / 1000); } } shortIdCache.shortenIdsIfAvailable(workItemStatus.getCounterUpdates());