This is an automated email from the ASF dual-hosted git repository. scwhittle pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 15f7f02a79d Add UnboundedReaderMaxReadTimeMs to DataflowPipelineDebugOptions, deprecate UnboundedReaderMaxReadTimeSec (#31091) 15f7f02a79d is described below commit 15f7f02a79d81786083b8cff5e69ed72b361145f Author: Radosław Stankiewicz <radosl...@google.com> AuthorDate: Fri Apr 26 14:48:31 2024 +0200 Add UnboundedReaderMaxReadTimeMs to DataflowPipelineDebugOptions, deprecate UnboundedReaderMaxReadTimeSec (#31091) --- .../options/DataflowPipelineDebugOptions.java | 27 +++++++++++++++++++++- .../dataflow/worker/WorkerCustomSources.java | 5 ++-- .../dataflow/worker/WorkerCustomSourcesTest.java | 6 ++--- 3 files changed, 31 insertions(+), 7 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java index 30496dec296..7a5284151b9 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java @@ -216,14 +216,39 @@ public interface DataflowPipelineDebugOptions void setReaderCacheTimeoutSec(Integer value); - /** The max amount of time an UnboundedReader is consumed before checkpointing. */ + /** + * The max amount of time an UnboundedReader is consumed before checkpointing. + * + * @deprecated use {@link DataflowPipelineDebugOptions#getUnboundedReaderMaxReadTimeMs()} instead + */ @Description( "The max amount of time before an UnboundedReader is consumed before checkpointing, in seconds.") + @Deprecated @Default.Integer(10) Integer getUnboundedReaderMaxReadTimeSec(); void setUnboundedReaderMaxReadTimeSec(Integer value); + /** The max amount of time an UnboundedReader is consumed before checkpointing. */ + @Description( + "The max amount of time before an UnboundedReader is consumed before checkpointing, in millis.") + @Default.InstanceFactory(UnboundedReaderMaxReadTimeFactory.class) + Integer getUnboundedReaderMaxReadTimeMs(); + + void setUnboundedReaderMaxReadTimeMs(Integer value); + + /** + * Sets Integer value based on old, deprecated field ({@link + * DataflowPipelineDebugOptions#getUnboundedReaderMaxReadTimeSec()}). + */ + final class UnboundedReaderMaxReadTimeFactory implements DefaultValueFactory<Integer> { + @Override + public Integer create(PipelineOptions options) { + DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class); + return debugOptions.getUnboundedReaderMaxReadTimeSec() * 1000; + } + } + /** The max elements read from an UnboundedReader before checkpointing. */ @Description("The max elements read from an UnboundedReader before checkpointing. ") @Default.Integer(10 * 1000) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java index 8c086016ee9..b965110b3ef 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java @@ -796,9 +796,8 @@ public class WorkerCustomSources { this.context = context; this.started = started; DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class); - this.endTime = - Instant.now() - .plus(Duration.standardSeconds(debugOptions.getUnboundedReaderMaxReadTimeSec())); + long maxReadTimeMs = debugOptions.getUnboundedReaderMaxReadTimeMs(); + this.endTime = Instant.now().plus(Duration.millis(maxReadTimeMs)); this.maxElems = debugOptions.getUnboundedReaderMaxElements(); this.backoffFactory = FluentBackoff.DEFAULT diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index d451ec093f7..cc9e6da4a73 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -645,10 +645,10 @@ public class WorkerCustomSourcesTest { numReadOnThisIteration++; } Instant afterReading = Instant.now(); - long maxReadSec = debugOptions.getUnboundedReaderMaxReadTimeSec(); + long maxReadMs = debugOptions.getUnboundedReaderMaxReadTimeMs(); assertThat( - new Duration(beforeReading, afterReading).getStandardSeconds(), - lessThanOrEqualTo(maxReadSec + 1)); + new Duration(beforeReading, afterReading).getMillis(), + lessThanOrEqualTo(maxReadMs + 1000L)); assertThat( numReadOnThisIteration, lessThanOrEqualTo(debugOptions.getUnboundedReaderMaxElements()));