This is an automated email from the ASF dual-hosted git repository. jrmccluskey pushed a commit to branch revert-31037-readerMaxReadTimeSec_as_double in repository https://gitbox.apache.org/repos/asf/beam.git
commit 5c8f548c944a9bbc32e07a13eefaac5493b26808 Author: Jack McCluskey <34928439+jrmcclus...@users.noreply.github.com> AuthorDate: Tue Apr 23 10:42:04 2024 -0400 Revert "Change type for UnboundedReaderMaxReadTimeSec (#31037)" This reverts commit bb310e7e90720b620f1089574f1656ca84a3656d. --- .../runners/dataflow/options/DataflowPipelineDebugOptions.java | 9 ++++----- .../apache/beam/runners/dataflow/worker/WorkerCustomSources.java | 4 +--- .../beam/runners/dataflow/worker/WorkerCustomSourcesTest.java | 7 +++---- 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java index 3f6c47ece68..30496dec296 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java @@ -218,12 +218,11 @@ public interface DataflowPipelineDebugOptions /** The max amount of time an UnboundedReader is consumed before checkpointing. */ @Description( - "The max amount of time before an UnboundedReader is consumed before checkpointing, " - + "in seconds. Duration can be set to fractions of seconds. ") - @Default.Double(10.0) - double getUnboundedReaderMaxReadTimeSec(); + "The max amount of time before an UnboundedReader is consumed before checkpointing, in seconds.") + @Default.Integer(10) + Integer getUnboundedReaderMaxReadTimeSec(); - void setUnboundedReaderMaxReadTimeSec(double value); + void setUnboundedReaderMaxReadTimeSec(Integer value); /** The max elements read from an UnboundedReader before checkpointing. */ @Description("The max elements read from an UnboundedReader before checkpointing. ") diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java index a8e358f19e0..8c086016ee9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java @@ -798,9 +798,7 @@ public class WorkerCustomSources { DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class); this.endTime = Instant.now() - .plus( - Duration.millis( - (long) (debugOptions.getUnboundedReaderMaxReadTimeSec() * 1000L))); + .plus(Duration.standardSeconds(debugOptions.getUnboundedReaderMaxReadTimeSec())); this.maxElems = debugOptions.getUnboundedReaderMaxElements(); this.backoffFactory = FluentBackoff.DEFAULT diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index 261567930fe..d451ec093f7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -598,7 +598,6 @@ public class WorkerCustomSourcesTest { int maxElements = 10; DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class); debugOptions.setUnboundedReaderMaxElements(maxElements); - debugOptions.setUnboundedReaderMaxReadTimeSec(10); ByteString state = ByteString.EMPTY; for (int i = 0; i < 10 * maxElements; @@ -646,10 +645,10 @@ public class WorkerCustomSourcesTest { numReadOnThisIteration++; } Instant afterReading = Instant.now(); - double maxReadSec = debugOptions.getUnboundedReaderMaxReadTimeSec(); + long maxReadSec = debugOptions.getUnboundedReaderMaxReadTimeSec(); assertThat( - new Duration(beforeReading, afterReading).getMillis(), - lessThanOrEqualTo((long) ((maxReadSec + 1) * 1000L))); + new Duration(beforeReading, afterReading).getStandardSeconds(), + lessThanOrEqualTo(maxReadSec + 1)); assertThat( numReadOnThisIteration, lessThanOrEqualTo(debugOptions.getUnboundedReaderMaxElements()));