This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 15f7f02a79d Add UnboundedReaderMaxReadTimeMs to 
DataflowPipelineDebugOptions, deprecate UnboundedReaderMaxReadTimeSec (#31091)
15f7f02a79d is described below

commit 15f7f02a79d81786083b8cff5e69ed72b361145f
Author: Radosław Stankiewicz <radosl...@google.com>
AuthorDate: Fri Apr 26 14:48:31 2024 +0200

    Add UnboundedReaderMaxReadTimeMs to DataflowPipelineDebugOptions, deprecate 
UnboundedReaderMaxReadTimeSec (#31091)
---
 .../options/DataflowPipelineDebugOptions.java      | 27 +++++++++++++++++++++-
 .../dataflow/worker/WorkerCustomSources.java       |  5 ++--
 .../dataflow/worker/WorkerCustomSourcesTest.java   |  6 ++---
 3 files changed, 31 insertions(+), 7 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
index 30496dec296..7a5284151b9 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
@@ -216,14 +216,39 @@ public interface DataflowPipelineDebugOptions
 
   void setReaderCacheTimeoutSec(Integer value);
 
-  /** The max amount of time an UnboundedReader is consumed before 
checkpointing. */
+  /**
+   * The max amount of time an UnboundedReader is consumed before 
checkpointing.
+   *
+   * @deprecated use {@link 
DataflowPipelineDebugOptions#getUnboundedReaderMaxReadTimeMs()} instead
+   */
   @Description(
       "The max amount of time before an UnboundedReader is consumed before 
checkpointing, in seconds.")
+  @Deprecated
   @Default.Integer(10)
   Integer getUnboundedReaderMaxReadTimeSec();
 
   void setUnboundedReaderMaxReadTimeSec(Integer value);
 
+  /** The max amount of time an UnboundedReader is consumed before 
checkpointing. */
+  @Description(
+      "The max amount of time before an UnboundedReader is consumed before 
checkpointing, in millis.")
+  @Default.InstanceFactory(UnboundedReaderMaxReadTimeFactory.class)
+  Integer getUnboundedReaderMaxReadTimeMs();
+
+  void setUnboundedReaderMaxReadTimeMs(Integer value);
+
+  /**
+   * Sets Integer value based on old, deprecated field ({@link
+   * DataflowPipelineDebugOptions#getUnboundedReaderMaxReadTimeSec()}).
+   */
+  final class UnboundedReaderMaxReadTimeFactory implements 
DefaultValueFactory<Integer> {
+    @Override
+    public Integer create(PipelineOptions options) {
+      DataflowPipelineDebugOptions debugOptions = 
options.as(DataflowPipelineDebugOptions.class);
+      return debugOptions.getUnboundedReaderMaxReadTimeSec() * 1000;
+    }
+  }
+
   /** The max elements read from an UnboundedReader before checkpointing. */
   @Description("The max elements read from an UnboundedReader before 
checkpointing. ")
   @Default.Integer(10 * 1000)
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
index 8c086016ee9..b965110b3ef 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
@@ -796,9 +796,8 @@ public class WorkerCustomSources {
       this.context = context;
       this.started = started;
       DataflowPipelineDebugOptions debugOptions = 
options.as(DataflowPipelineDebugOptions.class);
-      this.endTime =
-          Instant.now()
-              
.plus(Duration.standardSeconds(debugOptions.getUnboundedReaderMaxReadTimeSec()));
+      long maxReadTimeMs = debugOptions.getUnboundedReaderMaxReadTimeMs();
+      this.endTime = Instant.now().plus(Duration.millis(maxReadTimeMs));
       this.maxElems = debugOptions.getUnboundedReaderMaxElements();
       this.backoffFactory =
           FluentBackoff.DEFAULT
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
index d451ec093f7..cc9e6da4a73 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
@@ -645,10 +645,10 @@ public class WorkerCustomSourcesTest {
         numReadOnThisIteration++;
       }
       Instant afterReading = Instant.now();
-      long maxReadSec = debugOptions.getUnboundedReaderMaxReadTimeSec();
+      long maxReadMs = debugOptions.getUnboundedReaderMaxReadTimeMs();
       assertThat(
-          new Duration(beforeReading, afterReading).getStandardSeconds(),
-          lessThanOrEqualTo(maxReadSec + 1));
+          new Duration(beforeReading, afterReading).getMillis(),
+          lessThanOrEqualTo(maxReadMs + 1000L));
       assertThat(
           numReadOnThisIteration, 
lessThanOrEqualTo(debugOptions.getUnboundedReaderMaxElements()));
 

Reply via email to