(beam) branch master updated: Enable MapState and SetState for dataflow streaming engine pipelines with legacy runner by building on top of MultimapState. (#31453)

2024-07-04 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new c08afeae60d Enable MapState and SetState for dataflow streaming engine 
pipelines with legacy runner by building on top of MultimapState. (#31453)
c08afeae60d is described below

commit c08afeae60dfb1a15a0f4c8669085662a847249f
Author: Sam Whittle 
AuthorDate: Thu Jul 4 22:22:21 2024 +0200

Enable MapState and SetState for dataflow streaming engine pipelines with 
legacy runner by building on top of MultimapState. (#31453)
---
 CHANGES.md |   1 +
 .../org/apache/beam/runners/core/StateTags.java|   8 +
 .../beam/runners/dataflow/DataflowRunner.java  |  35 +---
 .../beam/runners/dataflow/DataflowRunnerTest.java  |  59 --
 .../dataflow/worker/StreamingDataflowWorker.java   |  11 +-
 .../worker/windmill/state/AbstractWindmillMap.java |  23 +++
 .../worker/windmill/state/CachingStateTable.java   |  53 +++--
 .../worker/windmill/state/WindmillMap.java |  24 +--
 .../windmill/state/WindmillMapViaMultimap.java | 164 +++
 .../worker/windmill/state/WindmillMultimap.java|   4 +-
 .../worker/windmill/state/WindmillSet.java |  36 +---
 .../worker/windmill/state/WindmillStateCache.java  |  46 +++--
 .../windmill/state/WindmillStateInternals.java |  14 +-
 .../worker/StreamingModeExecutionContextTest.java  |   5 +-
 .../dataflow/worker/WindmillStateTestUtils.java|   2 +-
 .../dataflow/worker/WorkerCustomSourcesTest.java   |   5 +-
 .../windmill/state/WindmillStateCacheTest.java |   2 +-
 .../windmill/state/WindmillStateInternalsTest.java | 225 -
 .../refresh/DispatchedActiveWorkRefresherTest.java |   2 +-
 .../java/org/apache/beam/sdk/state/StateSpecs.java |  23 +++
 .../org/apache/beam/sdk/transforms/ParDoTest.java  |  28 ++-
 21 files changed, 573 insertions(+), 197 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 38fa6e44b73..0a620038f11 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -68,6 +68,7 @@
 
 * Multiple RunInference instances can now share the same model instance by 
setting the model_identifier parameter (Python) 
([#31665](https://github.com/apache/beam/issues/31665)).
 * Removed a 3rd party LGPL dependency from the Go SDK 
([#31765](https://github.com/apache/beam/issues/31765)).
+* Support for MapState and SetState when using Dataflow Runner v1 with 
Streaming Engine (Java) 
([[#18200](https://github.com/apache/beam/issues/18200)])
 
 ## Breaking Changes
 
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
index 7ffb10c85c0..6ed7f8525fd 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
@@ -257,6 +257,14 @@ public class StateTags {
 new StructuredId(setTag.getId()), 
StateSpecs.convertToMapSpecInternal(setTag.getSpec()));
   }
 
+  public static  StateTag> 
convertToMultiMapTagInternal(
+  StateTag> mapTag) {
+StateSpec> spec = mapTag.getSpec();
+StateSpec> multimapSpec =
+StateSpecs.convertToMultimapSpecInternal(spec);
+return new SimpleStateTag<>(new StructuredId(mapTag.getId()), 
multimapSpec);
+  }
+
   private static class StructuredId implements Serializable {
 private final StateKind kind;
 private final String rawId;
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index de566599bf8..708c6341326 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -2564,11 +2564,6 @@ public class DataflowRunner extends 
PipelineRunner {
 || hasExperiment(options, "use_portable_job_submission");
   }
 
-  static boolean useStreamingEngine(DataflowPipelineOptions options) {
-return hasExperiment(options, GcpOptions.STREAMING_ENGINE_EXPERIMENT)
-|| hasExperiment(options, GcpOptions.WINDMILL_SERVICE_EXPERIMENT);
-  }
-
   static void verifyDoFnSupported(
   DoFn fn, boolean streaming, DataflowPipelineOptions options) {
 if (!streaming && DoFnSignatures.usesMultimapState(fn)) {
@@ -2583,8 +2578,6 @@ public class DataflowRunner extends 
PipelineRunner {
   "%s does not currently support @RequiresTimeSortedInput in 
streaming mode.",
   DataflowRunner.class.getSimpleName()));
 }
-
-boolean streamingEngine = useStreamin

(beam) branch master updated (0c89a0edb9c -> a5eee589697)

2024-07-04 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 0c89a0edb9c Fix playground snippets (#31778)
 add a5eee589697 Fix flaky StreamingDataflowWorkerTest which wasn't waiting 
for enough commits. (#31781)

No new revisions were added by this update.

Summary of changes:
 .../beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



(beam) branch master updated: [Dataflow Streaming] Enabled Heartbeat by Default (#31689)

2024-07-01 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new ef5060416d9 [Dataflow Streaming] Enabled Heartbeat by Default (#31689)
ef5060416d9 is described below

commit ef5060416d9fed2e08a6682e69657c6fa9f98af4
Author: TongruiLi <12992126+tongru...@users.noreply.github.com>
AuthorDate: Mon Jul 1 06:04:18 2024 -0700

[Dataflow Streaming] Enabled Heartbeat by Default (#31689)
---
 .../dataflow/worker/StreamingDataflowWorker.java   |  4 +--
 .../windmill/client/grpc/GrpcWindmillServer.java   |  4 +--
 .../client/grpc/GrpcWindmillServerTest.java| 37 +-
 3 files changed, 19 insertions(+), 26 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index fc1be2cd137..59819db88a0 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -602,8 +602,8 @@ public class StreamingDataflowWorker {
 
.setStreamingRpcBatchLimit(options.getWindmillServiceStreamingRpcBatchLimit())
 .setSendKeyedGetDataRequests(
 !options.isEnableStreamingEngine()
-|| !DataflowRunner.hasExperiment(
-options, "streaming_engine_send_new_heartbeat_requests"));
+|| DataflowRunner.hasExperiment(
+options, 
"streaming_engine_disable_new_heartbeat_requests"));
   }
 
   private static BoundedQueueExecutor 
createWorkUnitExecutor(DataflowWorkerHarnessOptions options) {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
index abf85d98548..0ab03a80318 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
@@ -176,8 +176,8 @@ public final class GrpcWindmillServer extends 
WindmillServerStub {
 testOptions(/* enableStreamingEngine= */ true, experiments);
 boolean sendKeyedGetDataRequests =
 !testOptions.isEnableStreamingEngine()
-|| !DataflowRunner.hasExperiment(
-testOptions, "streaming_engine_send_new_heartbeat_requests");
+|| DataflowRunner.hasExperiment(
+testOptions, 
"streaming_engine_disable_new_heartbeat_requests");
 GrpcWindmillStreamFactory windmillStreamFactory =
 GrpcWindmillStreamFactory.of(createJobHeader(testOptions, clientId))
 .setSendKeyedGetDataRequests(sendKeyedGetDataRequests)
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
index b1d5309e12d..6473d5527a8 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
@@ -124,8 +124,16 @@ public class GrpcWindmillServerTest {
 
   @Before
   public void setUp() throws Exception {
-String name = "Fake server for " + getClass();
+startServerAndClient(new ArrayList<>());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+server.shutdownNow();
+  }
 
+  private void startServerAndClient(List experiments) throws Exception 
{
+String name = "Fake server for " + getClass();
 this.server =
 InProcessServerBuilder.forName(name)
 .fallbackHandlerRegistry(serviceRegistry)
@@ -136,17 +144,12 @@ public class GrpcWindmillServerTest {
 this.client =
 GrpcWindmillServer.newTestInstance(
 name,
-new ArrayList<>(),
+experiments,
 clientId,
 new FakeWindmillStubFactory(
 () -> 
grpcCleanup.register(WindmillChannelFactory.inPro

(beam) branch master updated: move heartbeat processor to where it is being used (#31298)

2024-06-28 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 6ec1fb23ece move heartbeat processor to where it is being used (#31298)
6ec1fb23ece is described below

commit 6ec1fb23ece81721806bdc1323ffb23fa7ce55a0
Author: martin trieu 
AuthorDate: Fri Jun 28 02:31:11 2024 -0700

move heartbeat processor to where it is being used (#31298)
---
 .../dataflow/worker/StreamingDataflowWorker.java   | 172 +
 .../windmill/client/grpc/GrpcWindmillServer.java   |  53 +++
 .../client/grpc/GrpcWindmillStreamFactory.java | 101 +++-
 .../client/grpc/StreamingEngineClient.java |  22 +--
 .../windmill/client/grpc/WindmillStreamSender.java |  19 +--
 .../client/grpc/StreamingEngineClientTest.java |   5 +-
 .../client/grpc/WindmillStreamSenderTest.java  |  14 +-
 .../budget/EvenGetWorkBudgetDistributorTest.java   |   3 +-
 8 files changed, 210 insertions(+), 179 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 98015e2ea71..fc1be2cd137 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -22,6 +22,7 @@ import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
 
 import com.google.api.services.dataflow.model.CounterUpdate;
 import com.google.api.services.dataflow.model.MapTask;
+import com.google.auto.value.AutoValue;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -38,7 +39,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
-import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
 import org.apache.beam.runners.core.metrics.MetricsLogger;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
@@ -103,7 +103,6 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.*;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
-import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -121,11 +120,6 @@ public class StreamingDataflowWorker {
   MetricName.named(
   
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl",
   "throttling-msecs");
-  // Maximum number of threads for processing.  Currently each thread 
processes one key at a time.
-  static final int MAX_PROCESSING_THREADS = 300;
-  static final long THREAD_EXPIRATION_TIME_SEC = 60;
-  static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3;
-  static final Duration COMMIT_STREAM_TIMEOUT = Duration.standardMinutes(1);
 
   /**
* Sinks are marked 'full' in {@link StreamingModeExecutionContext} once the 
amount of data sinked
@@ -135,13 +129,20 @@ public class StreamingDataflowWorker {
*/
   public static final int MAX_SINK_BYTES = 10_000_000;
 
+  // Maximum number of threads for processing.  Currently, each thread 
processes one key at a time.
+  static final int MAX_PROCESSING_THREADS = 300;
+  static final long THREAD_EXPIRATION_TIME_SEC = 60;
+  static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3;
+  static final Duration COMMIT_STREAM_TIMEOUT = Duration.standardMinutes(1);
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamingDataflowWorker.class);
+
   /** The idGenerator to generate unique id globally. */
   private static final IdGenerator ID_GENERATOR = 
IdGenerators.decrementingLongs();
 
   private static final int DEFAULT_STATUS_PORT = 8081;
   // Maximum size of the result of a GetWork request.
   private static final long MAX_GET_WORK_FETCH_BYTES = 64L << 20; // 64m
+
   /** Maximum number of failure stacktraces to report in each update sent to 
backend. */
   private static final int MAX_FAILURES_TO_REPORT_IN_UPDATE = 1000;
 
@@ -328,39 +329,27 @@ public class StreamingDataflowWorker {
 threadName ->
 Executors.newSingleThreadScheduledExecutor(
 new ThreadFactoryBuilder().setNameFormat(threadName).build());
-

(beam) branch scwhittle-update-bq-docs deleted (was 9dfbc22fae9)

2024-06-11 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a change to branch scwhittle-update-bq-docs
in repository https://gitbox.apache.org/repos/asf/beam.git


 was 9dfbc22fae9 Remove notes now that Dataflow v2 supports BQ autosharding

The revisions that were on this branch are still contained in
other references; therefore, this change does not discard any commits
from the repository.



(beam) branch master updated: Remove notes now that Dataflow v2 supports BQ autosharding (#31561)

2024-06-11 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 462c8334903 Remove notes now that Dataflow v2 supports BQ autosharding 
(#31561)
462c8334903 is described below

commit 462c8334903c82917897cbe4c34d7aa8fe2bd375
Author: Sam Whittle 
AuthorDate: Tue Jun 11 20:14:45 2024 +0200

Remove notes now that Dataflow v2 supports BQ autosharding (#31561)
---
 .../www/site/content/en/documentation/io/built-in/google-bigquery.md  | 4 
 1 file changed, 4 deletions(-)

diff --git 
a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md 
b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
index 50fd5b8258d..d49e9bac949 100644
--- a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
+++ b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
@@ -789,9 +789,6 @@ BigQuery Storage Write API for Python SDK currently has 
some limitations on supp
 
 {{< paragraph class="language-py" >}}
 **Note:** If you want to run WriteToBigQuery with Storage Write API from the 
source code, you need to run `./gradlew 
:sdks:java:io:google-cloud-platform:expansion-service:build` to build the 
expansion-service jar. If you are running from a released Beam SDK, the jar is 
already included.
-
-**Note:** Auto sharding is not currently supported for Python's Storage Write 
API exactly-once mode on DataflowRunner.
-
 {{< /paragraph >}}
 
  Exactly-once semantics
@@ -883,7 +880,6 @@ explicitly enable this using 
[`withAutoSharding`](https://beam.apache.org/releas
 `STORAGE_WRITE_API` defaults to dynamic sharding when
 `numStorageWriteApiStreams` is set to 0 or is unspecified.
 
-***Note:*** Auto sharding with `STORAGE_WRITE_API` is supported by Dataflow, 
but **not** on Runner v2.
 {{< /paragraph >}}
 
 When using `STORAGE_WRITE_API`, the `PCollection` returned by



(beam) branch scwhittle-update-bq-docs created (now 9dfbc22fae9)

2024-06-11 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a change to branch scwhittle-update-bq-docs
in repository https://gitbox.apache.org/repos/asf/beam.git


  at 9dfbc22fae9 Remove notes now that Dataflow v2 supports BQ autosharding

This branch includes the following new commits:

 new 9dfbc22fae9 Remove notes now that Dataflow v2 supports BQ autosharding

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




(beam) 01/01: Remove notes now that Dataflow v2 supports BQ autosharding

2024-06-11 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch scwhittle-update-bq-docs
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 9dfbc22fae9fc767ad2e7a744af8c31bd5dc99b9
Author: Sam Whittle 
AuthorDate: Tue Jun 11 11:14:11 2024 +0200

Remove notes now that Dataflow v2 supports BQ autosharding
---
 .../www/site/content/en/documentation/io/built-in/google-bigquery.md  | 4 
 1 file changed, 4 deletions(-)

diff --git 
a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md 
b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
index 50fd5b8258d..d49e9bac949 100644
--- a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
+++ b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
@@ -789,9 +789,6 @@ BigQuery Storage Write API for Python SDK currently has 
some limitations on supp
 
 {{< paragraph class="language-py" >}}
 **Note:** If you want to run WriteToBigQuery with Storage Write API from the 
source code, you need to run `./gradlew 
:sdks:java:io:google-cloud-platform:expansion-service:build` to build the 
expansion-service jar. If you are running from a released Beam SDK, the jar is 
already included.
-
-**Note:** Auto sharding is not currently supported for Python's Storage Write 
API exactly-once mode on DataflowRunner.
-
 {{< /paragraph >}}
 
  Exactly-once semantics
@@ -883,7 +880,6 @@ explicitly enable this using 
[`withAutoSharding`](https://beam.apache.org/releas
 `STORAGE_WRITE_API` defaults to dynamic sharding when
 `numStorageWriteApiStreams` is set to 0 or is unspecified.
 
-***Note:*** Auto sharding with `STORAGE_WRITE_API` is supported by Dataflow, 
but **not** on Runner v2.
 {{< /paragraph >}}
 
 When using `STORAGE_WRITE_API`, the `PCollection` returned by



(beam) branch master updated: Fix incorrect Work.java cast and logging (#31528)

2024-06-11 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 0d037a993b5 Fix incorrect Work.java cast and logging (#31528)
0d037a993b5 is described below

commit 0d037a993b51406e6a6f630791a8d17bfbb608a7
Author: martin trieu 
AuthorDate: Tue Jun 11 00:35:47 2024 -0700

Fix incorrect Work.java cast and logging (#31528)
---
 .../runners/dataflow/worker/streaming/Work.java|  5 +-
 .../dataflow/worker/util/BoundedQueueExecutor.java | 13 +++-
 .../worker/util/BoundedQueueExecutorTest.java  | 76 --
 3 files changed, 81 insertions(+), 13 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
index fa46bac36b5..ed3f2671b40 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
@@ -58,7 +58,7 @@ import org.joda.time.Instant;
  */
 @NotThreadSafe
 @Internal
-public class Work {
+public final class Work {
   private final ShardedKey shardedKey;
   private final WorkItem workItem;
   private final ProcessingContext processingContext;
@@ -196,8 +196,7 @@ public class Work {
 return latencyTrackingId;
   }
 
-  public final void queueCommit(
-  WorkItemCommitRequest commitRequest, ComputationState computationState) {
+  public void queueCommit(WorkItemCommitRequest commitRequest, 
ComputationState computationState) {
 setState(State.COMMIT_QUEUED);
 processingContext.workCommitter().accept(Commit.create(commitRequest, 
computationState, this));
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
index 9a481169350..5e3f293f7d5 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
@@ -22,7 +22,8 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.concurrent.GuardedBy;
-import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard;
 
@@ -224,9 +225,10 @@ public class BoundedQueueExecutor {
   () -> {
 String threadName = Thread.currentThread().getName();
 try {
-  if (work instanceof Work) {
+  if (work instanceof ExecutableWork) {
 String workToken =
-String.format("%016x", ((Work) 
work).getWorkItem().getWorkToken());
+debugFormattedWorkToken(
+((ExecutableWork) 
work).work().getWorkItem().getWorkToken());
 Thread.currentThread().setName(threadName + ":" + workToken);
   }
   work.run();
@@ -242,6 +244,11 @@ public class BoundedQueueExecutor {
 }
   }
 
+  @VisibleForTesting
+  public static String debugFormattedWorkToken(long workToken) {
+return String.format("%016x", workToken);
+  }
+
   private void decrementCounters(long workBytes) {
 monitor.enter();
 --elementsOutstanding;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
index c0620952ef9..e08c951975f 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
@@ -23,9 +23,17 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Collections;
 import java.util.c

(beam) branch master updated (7b6f9415c10 -> f93a67a6ae8)

2024-05-31 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 7b6f9415c10 Updates Expansion Service Container to support upgrading 
using the schema-transform ID (#31451)
 add f93a67a6ae8 remove processing/scheduling logic from 
StreamingDataflowWorker (#31317)

No new revisions were added by this update.

Summary of changes:
 .../beam/runners/dataflow/worker/PubsubReader.java |   2 +-
 .../beam/runners/dataflow/worker/ReaderCache.java  |   4 +-
 .../dataflow/worker/StreamingDataflowWorker.java   | 614 +++--
 .../worker/StreamingModeExecutionContext.java  | 176 +++---
 .../dataflow/worker/UngroupedWindmillReader.java   |   2 +-
 .../dataflow/worker/WindmillTimerInternals.java|  39 +-
 .../dataflow/worker/WindowingWindmillReader.java   |   2 +-
 .../dataflow/worker/WorkerCustomSources.java   |   2 +-
 .../dataflow/worker/streaming/ActiveWorkState.java |  77 +--
 .../worker/streaming/ComputationState.java |  52 +-
 .../worker/streaming/ComputationStateCache.java|   1 +
 .../worker/streaming/ComputationWorkExecutor.java  | 118 
 .../{ShardedKey.java => ExecutableWork.java}   |  26 +-
 .../dataflow/worker/streaming/ExecutionState.java  |  54 --
 .../dataflow/worker/streaming/Watermarks.java  |  69 +++
 .../runners/dataflow/worker/streaming/Work.java| 289 ++
 .../runners/dataflow/worker/streaming/WorkId.java  |   8 +
 .../streaming/sideinput/SideInputStateFetcher.java |   2 +
 .../worker/util/common/worker/MapTaskExecutor.java |   2 +-
 .../client/grpc/GrpcDirectGetWorkStream.java   |  53 +-
 .../windmill/client/grpc/GrpcDispatcherClient.java |  32 +-
 .../client/grpc/GrpcWindmillStreamFactory.java |  11 +-
 .../client/grpc/StreamingEngineClient.java | 161 --
 .../windmill/client/grpc/WindmillStreamSender.java |  37 +-
 .../worker/windmill/state/WindmillStateReader.java |  32 +-
 .../windmill/work/ProcessWorkItemClient.java   |  52 --
 ...rkItemProcessor.java => WorkItemScheduler.java} |  29 +-
 .../processing/ComputationWorkExecutorFactory.java | 291 ++
 .../work/processing/StreamingCommitFinalizer.java  |  85 +++
 .../work/processing/StreamingWorkScheduler.java| 428 ++
 .../processing/failures/WorkFailureProcessor.java  |  16 +-
 .../runners/dataflow/worker/PubsubReaderTest.java  |   2 +-
 .../worker/StreamingDataflowWorkerTest.java| 172 +++---
 .../worker/StreamingModeExecutionContextTest.java  |  40 +-
 .../dataflow/worker/WorkerCustomSourcesTest.java   |  83 +--
 .../worker/streaming/ActiveWorkStateTest.java  | 213 ---
 .../streaming/ComputationStateCacheTest.java   |  45 +-
 .../StreamingApplianceWorkCommitterTest.java   |  19 +-
 .../commits/StreamingEngineWorkCommitterTest.java  |  28 +-
 .../client/grpc/StreamingEngineClientTest.java |  37 +-
 .../client/grpc/WindmillStreamSenderTest.java  |  42 +-
 .../budget/EvenGetWorkBudgetDistributorTest.java   |  14 +-
 .../failures/WorkFailureProcessorTest.java |  75 ++-
 .../refresh/DispatchedActiveWorkRefresherTest.java |  54 +-
 44 files changed, 2182 insertions(+), 1408 deletions(-)
 create mode 100644 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java
 copy 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/{ShardedKey.java
 => ExecutableWork.java} (58%)
 delete mode 100644 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutionState.java
 create mode 100644 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Watermarks.java
 delete mode 100644 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/ProcessWorkItemClient.java
 rename 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/{WorkItemProcessor.java
 => WorkItemScheduler.java} (61%)
 create mode 100644 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java
 create mode 100644 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java
 create mode 100644 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java



(beam) branch master updated: Add options to specify read and write http timeout for gcs as well as lower batching limit for rewrite operations which are copying. (#31410)

2024-05-29 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 49a4290426d Add options to specify read and write http timeout for gcs 
as well as lower batching limit for rewrite operations which are copying. 
(#31410)
49a4290426d is described below

commit 49a4290426d5cfad56b5d9977f26517de5036885
Author: Sam Whittle 
AuthorDate: Wed May 29 15:37:34 2024 +0200

Add options to specify read and write http timeout for gcs as well as lower 
batching limit for rewrite operations which are copying. (#31410)
---
 .../sdk/extensions/gcp/options/GcsOptions.java |  18 +++
 .../beam/sdk/extensions/gcp/util/GcsUtil.java  | 142 +
 .../gcp/util/RetryHttpRequestInitializer.java  |  15 ++-
 .../beam/sdk/extensions/gcp/util/Transport.java|  41 +++---
 .../beam/sdk/extensions/gcp/util/GcsUtilTest.java  |  83 +---
 5 files changed, 211 insertions(+), 88 deletions(-)

diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
index 3b2461dcb0e..175d8f58de4 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
@@ -124,6 +124,24 @@ public interface GcsOptions extends 
ApplicationNameOptions, GcpOptions, Pipeline
 
   void setGcsPerformanceMetrics(Boolean reportPerformanceMetrics);
 
+  @Description("Read timeout for gcs http requests")
+  @Nullable
+  Integer getGcsHttpRequestReadTimeout();
+
+  void setGcsHttpRequestReadTimeout(@Nullable Integer timeoutMs);
+
+  @Description("Write timeout for gcs http requests.")
+  @Nullable
+  Integer getGcsHttpRequestWriteTimeout();
+
+  void setGcsHttpRequestWriteTimeout(@Nullable Integer timeoutMs);
+
+  @Description("Batching limit for rewrite ops which will copy data.")
+  @Nullable
+  Integer getGcsRewriteDataOpBatchLimit();
+
+  void setGcsRewriteDataOpBatchLimit(@Nullable Integer timeoutMs);
+
   /**
* Returns the default {@link ExecutorService} to use within the Apache Beam 
SDK. The {@link
* ExecutorService} is compatible with AppEngine.
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
index 60e8443d264..9e790002ecd 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
@@ -86,6 +86,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.MoreFutures;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
@@ -123,7 +124,8 @@ public class GcsUtil {
   gcsOptions.getExecutorService(),
   hasExperiment(options, "use_grpc_for_gcs"),
   gcsOptions.getGcpCredential(),
-  gcsOptions.getGcsUploadBufferSizeBytes());
+  gcsOptions.getGcsUploadBufferSizeBytes(),
+  gcsOptions.getGcsRewriteDataOpBatchLimit());
 }
 
 /** Returns an instance of {@link GcsUtil} based on the given parameters. 
*/
@@ -140,7 +142,8 @@ public class GcsUtil {
   executorService,
   hasExperiment(options, "use_grpc_for_gcs"),
   credentials,
-  uploadBufferSizeBytes);
+  uploadBufferSizeBytes,
+  null);
 }
   }
 
@@ -154,6 +157,8 @@ public class GcsUtil {
 
   /** Maximum number of requests permitted in a GCS batch request. */
   private static final int MAX_REQUESTS_PER_BATCH = 100;
+  /** Default maximum number of requests permitted in a GCS batch request 
where data is copied. */
+  private static final int MAX_REQUESTS_PER_COPY_BATCH = 10;
   /** Maximum number of concurrent batches of requests executing on GCS. */
   private static final int MAX_CONCURRENT_BATCHES = 256;
 
@@ -179,11 +184,13 @@ public class GcsUtil {
   // Exposed for testing.
   final Executo

(beam) branch master updated: PubsubIO: Add readMessagesWithAttributesWithCoderAndParseFn (#31206)

2024-05-21 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 88af35e6df4 PubsubIO: Add 
readMessagesWithAttributesWithCoderAndParseFn (#31206)
88af35e6df4 is described below

commit 88af35e6df4de91a05b6e637ce840742700187c3
Author: Maja Kontrec Rönn 
AuthorDate: Tue May 21 12:22:31 2024 +0200

PubsubIO: Add readMessagesWithAttributesWithCoderAndParseFn (#31206)
---
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java| 11 ++
 .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java   | 43 ++
 2 files changed, 54 insertions(+)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 1d687812560..01848d92d92 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -671,6 +671,17 @@ public class PubsubIO {
 return Read.newBuilder(parseFn).setCoder(coder).build();
   }
 
+  /**
+   * Returns A {@link PTransform} that continuously reads from a Google Cloud 
Pub/Sub stream,
+   * mapping each {@link PubsubMessage}, with attributes, into type T using 
the supplied parse
+   * function and coder. Similar to {@link 
#readMessagesWithCoderAndParseFn(Coder, SimpleFunction)},
+   * but with the with addition of making the message attributes available to 
the ParseFn.
+   */
+  public static  Read readMessagesWithAttributesWithCoderAndParseFn(
+  Coder coder, SimpleFunction parseFn) {
+return 
Read.newBuilder(parseFn).setCoder(coder).setNeedsAttributes(true).build();
+  }
+
   /**
* Returns a {@link PTransform} that continuously reads binary encoded Avro 
messages into the Avro
* {@link GenericRecord} type.
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index abc35d0bb1b..fe6338a501c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -780,6 +780,49 @@ public class PubsubIOTest {
 pipeline.run();
   }
 
+  static class AppendSuffixAttributeToStringPayloadParseFn
+  extends SimpleFunction {
+@Override
+public String apply(PubsubMessage input) {
+  String payload = new String(input.getPayload(), StandardCharsets.UTF_8);
+  String suffixAttribute = input.getAttributeMap().get("suffix");
+  return payload + suffixAttribute;
+}
+  }
+
+  private IncomingMessage messageWithSuffixAttribute(String payload, String 
suffix) {
+return IncomingMessage.of(
+com.google.pubsub.v1.PubsubMessage.newBuilder()
+.setData(ByteString.copyFromUtf8(payload))
+.putAttributes("suffix", suffix)
+.build(),
+1234L,
+0,
+UUID.randomUUID().toString(),
+UUID.randomUUID().toString());
+  }
+
+  @Test
+  public void testReadMessagesWithAttributesWithCoderAndParseFn() {
+ImmutableList inputs =
+ImmutableList.of(
+messageWithSuffixAttribute("foo", "-some-suffix"),
+messageWithSuffixAttribute("bar", "-some-other-suffix"));
+clientFactory = PubsubTestClient.createFactoryForPull(CLOCK, SUBSCRIPTION, 
60, inputs);
+
+PCollection read =
+pipeline.apply(
+PubsubIO.readMessagesWithAttributesWithCoderAndParseFn(
+StringUtf8Coder.of(), new 
AppendSuffixAttributeToStringPayloadParseFn())
+.fromSubscription(SUBSCRIPTION.getPath())
+.withClock(CLOCK)
+.withClientFactory(clientFactory));
+
+List outputs = ImmutableList.of("foo-some-suffix", 
"bar-some-other-suffix");
+PAssert.that(read).containsInAnyOrder(outputs);
+pipeline.run();
+  }
+
   @Test
   public void testDynamicTopicsBounded() throws IOException {
 testDynamicTopics(true);



(beam) branch master updated: Make GrpcCommitWorkStream thread-safe as documented by moving batcher out of it. (#31304)

2024-05-21 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 89795c0e7c7 Make GrpcCommitWorkStream thread-safe as documented by 
moving batcher out of it. (#31304)
89795c0e7c7 is described below

commit 89795c0e7c731f37f737c908ede9cb9d3b578a24
Author: Sam Whittle 
AuthorDate: Tue May 21 10:53:04 2024 +0200

Make GrpcCommitWorkStream thread-safe as documented by moving batcher out 
of it. (#31304)

Also increase the number of streams in commit cache to number of threads
---
 .../dataflow/worker/StreamingDataflowWorker.java   |   3 +-
 .../worker/windmill/client/WindmillStream.java |  39 +--
 .../commits/StreamingApplianceWorkCommitter.java   |   3 +-
 .../commits/StreamingEngineWorkCommitter.java  |  43 
 .../windmill/client/grpc/GrpcCommitWorkStream.java |  72 ++--
 .../dataflow/worker/FakeWindmillServer.java| 104 +++---
 .../commits/StreamingEngineWorkCommitterTest.java  |  57 --
 .../client/grpc/GrpcWindmillServerTest.java| 121 +++--
 8 files changed, 290 insertions(+), 152 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index f18d5fac721..78b7d3c9dfe 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -155,7 +155,6 @@ public class StreamingDataflowWorker {
   // Maximum number of threads for processing.  Currently each thread 
processes one key at a time.
   static final int MAX_PROCESSING_THREADS = 300;
   static final long THREAD_EXPIRATION_TIME_SEC = 60;
-  static final int NUM_COMMIT_STREAMS = 1;
   static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3;
   static final Duration COMMIT_STREAM_TIMEOUT = Duration.standardMinutes(1);
 
@@ -280,7 +279,7 @@ public class StreamingDataflowWorker {
 windmillServiceEnabled
 ? StreamingEngineWorkCommitter.create(
 WindmillStreamPool.create(
-NUM_COMMIT_STREAMS, COMMIT_STREAM_TIMEOUT, 
windmillServer::commitWorkStream)
+numCommitThreads, COMMIT_STREAM_TIMEOUT, 
windmillServer::commitWorkStream)
 ::getCloseableStream,
 numCommitThreads,
 this::onCompleteCommit)
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java
index 7c22f4fb576..d044e930079 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java
@@ -17,10 +17,12 @@
  */
 package org.apache.beam.runners.dataflow.worker.windmill.client;
 
+import java.io.Closeable;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import javax.annotation.concurrent.NotThreadSafe;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.HeartbeatRequest;
@@ -68,21 +70,34 @@ public interface WindmillStream {
   /** Interface for streaming CommitWorkRequests to Windmill. */
   @ThreadSafe
   interface CommitWorkStream extends WindmillStream {
+@NotThreadSafe
+interface RequestBatcher extends Closeable {
+  /**
+   * Commits a work item and running onDone when the commit has been 
processed by the server.
+   * Returns true if the request was accepted. If false is returned the 
stream should be flushed
+   * and the request recommitted.
+   *
+   * onDone will be called with the status of the commit.
+   */
+  boolean commitWorkItem(
+  String computation,
+  Windmill.WorkItemCommitRequest request,
+  Consumer onDone);
+
+  /** Flushes any pending work items to the wire. */
+  void flush();
+
+  @Override
+  default void close() {
+flush();
+  }
+}
 
 /**
- * Commits a work item and running onDone when the commit has been 
processed by the server.
- * Returns true if the request was accepted. If false is returned

(beam) branch master updated: Implementing lull reporting at bundle level processing (#30693)

2024-04-29 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 61ca405d372 Implementing lull reporting at bundle level processing 
(#30693)
61ca405d372 is described below

commit 61ca405d372b02c985f641e96d10e79f61400f4c
Author: Arvind Ram 
AuthorDate: Mon Apr 29 06:15:45 2024 -0700

Implementing lull reporting at bundle level processing (#30693)

This reverts commit 50c59912bc002947c335170b42827b278b78aae1.
* reset lull state cleanup during deactivate
---
 .../core/metrics/ExecutionStateTracker.java|  41 +++-
 .../dataflow/worker/DataflowExecutionContext.java  |  96 +
 .../dataflow/worker/DataflowOperationContext.java  |  31 +-
 .../runners/dataflow/worker/StackTraceUtil.java|  66 
 .../worker/DataflowExecutionStateTrackerTest.java  | 115 -
 .../util/common/worker/MapTaskExecutorTest.java|  28 +++--
 6 files changed, 336 insertions(+), 41 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
index dc6fd2f8248..f70e9ac16f9 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
@@ -46,6 +46,7 @@ public class ExecutionStateTracker implements 
Comparable
   new ConcurrentHashMap<>();
 
   private static final long LULL_REPORT_MS = TimeUnit.MINUTES.toMillis(5);
+  private static final long BUNDLE_LULL_REPORT_MS = 
TimeUnit.MINUTES.toMillis(10);
   private static final AtomicIntegerFieldUpdater 
SAMPLING_UPDATER =
   AtomicIntegerFieldUpdater.newUpdater(ExecutionStateTracker.class, 
"sampling");
 
@@ -139,8 +140,17 @@ public class ExecutionStateTracker implements 
Comparable
*/
   private volatile long millisSinceLastTransition = 0;
 
+  /**
+   * The number of milliseconds since the {@link ExecutionStateTracker} 
initial state.
+   *
+   * This variable is updated by the Sampling thread, and read by the 
Progress Reporting thread,
+   * thus it being marked volatile.
+   */
+  private volatile long millisSinceBundleStart = 0;
+
   private long transitionsAtLastSample = 0;
   private long nextLullReportMs = LULL_REPORT_MS;
+  private long nextBundleLullDurationReportMs = BUNDLE_LULL_REPORT_MS;
 
   public ExecutionStateTracker(ExecutionStateSampler sampler) {
 this.sampler = sampler;
@@ -239,13 +249,16 @@ public class ExecutionStateTracker implements 
Comparable
 return trackedThread;
   }
 
-  private synchronized void deactivate() {
+  @VisibleForTesting
+  public synchronized void deactivate() {
 sampler.removeTracker(this);
 Thread thread = this.trackedThread;
 if (thread != null) {
   CURRENT_TRACKERS.remove(thread.getId());
 }
 this.trackedThread = null;
+millisSinceBundleStart = 0;
+nextBundleLullDurationReportMs = BUNDLE_LULL_REPORT_MS;
   }
 
   public ExecutionState getCurrentState() {
@@ -294,6 +307,11 @@ public class ExecutionStateTracker implements 
Comparable
 return millisSinceLastTransition;
   }
 
+  /** Return the time since the last transition. */
+  public long getMillisSinceBundleStart() {
+return millisSinceBundleStart;
+  }
+
   /** Return the number of transitions since the last sample. */
   public long getTransitionsAtLastSample() {
 return transitionsAtLastSample;
@@ -304,6 +322,12 @@ public class ExecutionStateTracker implements 
Comparable
 return nextLullReportMs;
   }
 
+  /** Return the duration since bundle start for the next bundle lull report. 
*/
+  @VisibleForTesting
+  public long getNextBundleLullDurationReportMs() {
+return nextBundleLullDurationReportMs;
+  }
+
   /**
* Called periodically by the {@link ExecutionStateSampler} to report time 
recorded by the
* tracker.
@@ -335,6 +359,21 @@ public class ExecutionStateTracker implements 
Comparable
   transitionsAtLastSample = transitionsAtThisSample;
 }
 updateMillisSinceLastTransition(millisSinceLastSample, state);
+updateMillisSinceBundleStart(millisSinceLastSample);
+  }
+
+  // Override this to implement bundle level lull reporting.
+  protected void reportBundleLull(Thread trackedThread, long 
millisSinceBundleStart) {}
+
+  // This suppression doesn't cause any race condition because it is updated 
by only one thread
+  // which is currently tracked.
+  @SuppressWarnings("NonAtomicVolatileUpdate")
+  private void updateMillisSinceBundleStart(long millisSinceLastSample) {
+millisSinceBundleStart += millisSinceLastSample;
+if (millisSinceBundleStart > nextBundleLullDurationReportMs) {
+  r

(beam) branch master updated: Add UnboundedReaderMaxReadTimeMs to DataflowPipelineDebugOptions, deprecate UnboundedReaderMaxReadTimeSec (#31091)

2024-04-26 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 15f7f02a79d Add UnboundedReaderMaxReadTimeMs to 
DataflowPipelineDebugOptions, deprecate UnboundedReaderMaxReadTimeSec (#31091)
15f7f02a79d is described below

commit 15f7f02a79d81786083b8cff5e69ed72b361145f
Author: Radosław Stankiewicz 
AuthorDate: Fri Apr 26 14:48:31 2024 +0200

Add UnboundedReaderMaxReadTimeMs to DataflowPipelineDebugOptions, deprecate 
UnboundedReaderMaxReadTimeSec (#31091)
---
 .../options/DataflowPipelineDebugOptions.java  | 27 +-
 .../dataflow/worker/WorkerCustomSources.java   |  5 ++--
 .../dataflow/worker/WorkerCustomSourcesTest.java   |  6 ++---
 3 files changed, 31 insertions(+), 7 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
index 30496dec296..7a5284151b9 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
@@ -216,14 +216,39 @@ public interface DataflowPipelineDebugOptions
 
   void setReaderCacheTimeoutSec(Integer value);
 
-  /** The max amount of time an UnboundedReader is consumed before 
checkpointing. */
+  /**
+   * The max amount of time an UnboundedReader is consumed before 
checkpointing.
+   *
+   * @deprecated use {@link 
DataflowPipelineDebugOptions#getUnboundedReaderMaxReadTimeMs()} instead
+   */
   @Description(
   "The max amount of time before an UnboundedReader is consumed before 
checkpointing, in seconds.")
+  @Deprecated
   @Default.Integer(10)
   Integer getUnboundedReaderMaxReadTimeSec();
 
   void setUnboundedReaderMaxReadTimeSec(Integer value);
 
+  /** The max amount of time an UnboundedReader is consumed before 
checkpointing. */
+  @Description(
+  "The max amount of time before an UnboundedReader is consumed before 
checkpointing, in millis.")
+  @Default.InstanceFactory(UnboundedReaderMaxReadTimeFactory.class)
+  Integer getUnboundedReaderMaxReadTimeMs();
+
+  void setUnboundedReaderMaxReadTimeMs(Integer value);
+
+  /**
+   * Sets Integer value based on old, deprecated field ({@link
+   * DataflowPipelineDebugOptions#getUnboundedReaderMaxReadTimeSec()}).
+   */
+  final class UnboundedReaderMaxReadTimeFactory implements 
DefaultValueFactory {
+@Override
+public Integer create(PipelineOptions options) {
+  DataflowPipelineDebugOptions debugOptions = 
options.as(DataflowPipelineDebugOptions.class);
+  return debugOptions.getUnboundedReaderMaxReadTimeSec() * 1000;
+}
+  }
+
   /** The max elements read from an UnboundedReader before checkpointing. */
   @Description("The max elements read from an UnboundedReader before 
checkpointing. ")
   @Default.Integer(10 * 1000)
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
index 8c086016ee9..b965110b3ef 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
@@ -796,9 +796,8 @@ public class WorkerCustomSources {
   this.context = context;
   this.started = started;
   DataflowPipelineDebugOptions debugOptions = 
options.as(DataflowPipelineDebugOptions.class);
-  this.endTime =
-  Instant.now()
-  
.plus(Duration.standardSeconds(debugOptions.getUnboundedReaderMaxReadTimeSec()));
+  long maxReadTimeMs = debugOptions.getUnboundedReaderMaxReadTimeMs();
+  this.endTime = Instant.now().plus(Duration.millis(maxReadTimeMs));
   this.maxElems = debugOptions.getUnboundedReaderMaxElements();
   this.backoffFactory =
   FluentBackoff.DEFAULT
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
index d451ec093f7..cc9e6da4a73 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apach

(beam) branch master updated: Change caching of global window inputs to be guarded by experiment (#31013)

2024-04-18 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new bcb40cf4e4a Change caching of global window inputs to be guarded by 
experiment (#31013)
bcb40cf4e4a is described below

commit bcb40cf4e4a9b9045b51162edab09cf245456038
Author: Sam Whittle 
AuthorDate: Thu Apr 18 10:37:07 2024 +0200

Change caching of global window inputs to be guarded by experiment (#31013)

* Change caching of global window inputs to be guarded by experiment
disable_global_windowed_args_caching
---
 sdks/python/apache_beam/runners/common.pxd |  4 +-
 sdks/python/apache_beam/runners/common.py  | 75 --
 2 files changed, 54 insertions(+), 25 deletions(-)

diff --git a/sdks/python/apache_beam/runners/common.pxd 
b/sdks/python/apache_beam/runners/common.pxd
index 9fb44af6377..683bf8fcac1 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -100,7 +100,9 @@ cdef class PerWindowInvoker(DoFnInvoker):
   cdef dict kwargs_for_process_batch
   cdef list placeholders_for_process_batch
   cdef bint has_windowed_inputs
-  cdef bint cache_globally_windowed_args
+  cdef bint recalculate_window_args
+  cdef bint has_cached_window_args
+  cdef bint has_cached_window_batch_args
   cdef object process_method
   cdef object process_batch_method
   cdef bint is_splittable
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index 82ff939dbae..7a1cef4005e 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -761,6 +761,17 @@ class PerWindowInvoker(DoFnInvoker):
   self.current_window_index = None
   self.stop_window_index = None
 
+# TODO(https://github.com/apache/beam/issues/28776): Remove caching after
+# fully rolling out.
+# If true, always recalculate window args. If false, has_cached_window_args
+# and has_cached_window_batch_args will be set to true if the corresponding
+# self.args_for_process,have been updated and should be reused directly.
+self.recalculate_window_args = (
+self.has_windowed_inputs or 'disable_global_windowed_args_caching' in
+RuntimeValueProvider.experiments)
+self.has_cached_window_args = False
+self.has_cached_window_batch_args = False
+
 # Try to prepare all the arguments that can just be filled in
 # without any additional work. in the process function.
 # Also cache all the placeholders needed in the process function.
@@ -921,16 +932,23 @@ class PerWindowInvoker(DoFnInvoker):
  additional_kwargs,
 ):
 # type: (...) -> Optional[SplitResultResidual]
-if self.has_windowed_inputs:
-  assert len(windowed_value.windows) <= 1
-  window, = windowed_value.windows
+if self.has_cached_window_args:
+  args_for_process, kwargs_for_process = (
+  self.args_for_process, self.kwargs_for_process)
 else:
-  window = GlobalWindow()
-side_inputs = [si[window] for si in self.side_inputs]
-side_inputs.extend(additional_args)
-args_for_process, kwargs_for_process = util.insert_values_in_args(
-self.args_for_process, self.kwargs_for_process,
-side_inputs)
+  if self.has_windowed_inputs:
+assert len(windowed_value.windows) <= 1
+window, = windowed_value.windows
+  else:
+window = GlobalWindow()
+  side_inputs = [si[window] for si in self.side_inputs]
+  side_inputs.extend(additional_args)
+  args_for_process, kwargs_for_process = util.insert_values_in_args(
+  self.args_for_process, self.kwargs_for_process, side_inputs)
+  if not self.recalculate_window_args:
+self.args_for_process, self.kwargs_for_process = (
+args_for_process, kwargs_for_process)
+self.has_cached_window_args = True
 
 # Extract key in the case of a stateful DoFn. Note that in the case of a
 # stateful DoFn, we set during __init__ self.has_windowed_inputs to be
@@ -1012,20 +1030,29 @@ class PerWindowInvoker(DoFnInvoker):
   ):
 # type: (...) -> Optional[SplitResultResidual]
 
-if self.has_windowed_inputs:
-  assert isinstance(windowed_batch, HomogeneousWindowedBatch)
-  assert len(windowed_batch.windows) <= 1
-  window, = windowed_batch.windows
+if self.has_cached_window_batch_args:
+  args_for_process_batch, kwargs_for_process_batch = (
+  self.args_for_process_batch, self.kwargs_for_process_batch)
 else:
-  window = GlobalWindow()
-side_inputs = [si[window] for si in self.side_inputs]
-side_inputs.extend(additional_args)
-(args_for_process_batch, kwargs_for_process_batch) = (
-util.insert

(beam) branch master updated (1e89f834c84 -> 1d8fdfa83ca)

2024-04-16 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 1e89f834c84 Folow up of recent GHA breakages (#30996)
 add 1d8fdfa83ca Remove caching of global window side input param when 
invoking. (#30991)

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/runners/common.py | 70 +--
 1 file changed, 20 insertions(+), 50 deletions(-)



(beam) branch master updated: Fix ConcurrentModification exception possible in DataflowExecutionStateSampler (#30993)

2024-04-16 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new f6322dae132 Fix ConcurrentModification exception possible in 
DataflowExecutionStateSampler (#30993)
f6322dae132 is described below

commit f6322dae13230cd3e15da3cae7ac2158b5efc7fb
Author: Sam Whittle 
AuthorDate: Tue Apr 16 19:16:34 2024 +0200

Fix ConcurrentModification exception possible in 
DataflowExecutionStateSampler (#30993)

Also ensure that the result is not modified by observing it. Previously we
were merging into completed with each observation which appeared unintended.
---
 .../dataflow/worker/DataflowExecutionStateSampler.java | 14 --
 .../dataflow/worker/DataflowExecutionStateSamplerTest.java |  3 +++
 2 files changed, 11 insertions(+), 6 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java
index 1ff9a9be40d..80955185eea 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java
@@ -39,6 +39,7 @@ public final class DataflowExecutionStateSampler extends 
ExecutionStateSampler {
 
   private final ConcurrentHashMap 
activeTrackersByWorkId =
   new ConcurrentHashMap<>();
+  // The maps within completeProcessingMetrics should not be modified.
   private final ConcurrentHashMap>
   completedProcessingMetrics = new ConcurrentHashMap<>();
 
@@ -64,7 +65,7 @@ public final class DataflowExecutionStateSampler extends 
ExecutionStateSampler {
 this.activeTrackersByWorkId.put(dfTracker.getWorkItemId(), dfTracker);
   }
 
-  private static Map mergeStepStatsMaps(
+  private static void mergeStepStatsMaps(
   Map map1, Map map2) {
 for (Entry steps : map2.entrySet()) {
   map1.compute(
@@ -77,7 +78,6 @@ public final class DataflowExecutionStateSampler extends 
ExecutionStateSampler {
 return v;
   });
 }
-return map1;
   }
 
   @Override
@@ -118,13 +118,15 @@ public final class DataflowExecutionStateSampler extends 
ExecutionStateSampler {
   }
 
   public Map 
getProcessingDistributionsForWorkId(String workId) {
+Map result;
 DataflowExecutionStateTracker tracker = activeTrackersByWorkId.get(workId);
 if (tracker == null) {
-  return completedProcessingMetrics.getOrDefault(workId, new HashMap<>());
+  result = new HashMap<>();
+} else {
+  result = tracker.getProcessingTimesByStepCopy();
 }
-return mergeStepStatsMaps(
-completedProcessingMetrics.getOrDefault(workId, new HashMap<>()),
-tracker.getProcessingTimesByStepCopy());
+mergeStepStatsMaps(result, completedProcessingMetrics.getOrDefault(workId, 
new HashMap<>()));
+return result;
   }
 
   public void resetForWorkId(String workId) {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java
index 920e37d40ec..ab5059bd937 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java
@@ -106,6 +106,9 @@ public class DataflowExecutionStateSamplerTest {
 assertThat(sampler.getActiveMessageMetadataForWorkId(workId).get(), 
equalTo(testMetadata));
 assertThat(
 sampler.getProcessingDistributionsForWorkId(workId), 
equalTo(testCompletedProcessingTimes));
+// Repeated calls should not modify the result.
+assertThat(
+sampler.getProcessingDistributionsForWorkId(workId), 
equalTo(testCompletedProcessingTimes));
   }
 
   @Test



(beam) branch master updated: Improvements to GetWorkTimingInfosTracker when there is clock skew between the worker and service. (#30990)

2024-04-16 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new a6f3ddf898c Improvements to GetWorkTimingInfosTracker when there is 
clock skew between the worker and service. (#30990)
a6f3ddf898c is described below

commit a6f3ddf898cbdc7d3114d80ab6fff87762f8fb9e
Author: Sam Whittle 
AuthorDate: Tue Apr 16 18:30:09 2024 +0200

Improvements to GetWorkTimingInfosTracker when there is clock skew between 
the worker and service. (#30990)

- remove spammy log
- use service provided timestamps when scaling
---
 .../client/grpc/GetWorkTimingInfosTracker.java | 25 ++-
 .../client/grpc/GrpcWindmillServerTest.java| 51 +++---
 2 files changed, 59 insertions(+), 17 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkTimingInfosTracker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkTimingInfosTracker.java
index dc3486d743a..8e70ef03158 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkTimingInfosTracker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkTimingInfosTracker.java
@@ -109,7 +109,7 @@ final class GetWorkTimingInfosTracker {
 Instant forwardedByDispatcherTiming =
 getWorkStreamTimings.get(Event.GET_WORK_FORWARDED_BY_DISPATCHER);
 Instant now = Instant.ofEpochMilli(clock.getMillis());
-if (forwardedByDispatcherTiming != null) {
+if (forwardedByDispatcherTiming != null && 
now.isAfter(forwardedByDispatcherTiming)) {
   Duration newDuration = new Duration(forwardedByDispatcherTiming, now);
   aggregatedGetWorkStreamLatencies.compute(
   State.GET_WORK_IN_TRANSIT_TO_USER_WORKER,
@@ -134,22 +134,23 @@ final class GetWorkTimingInfosTracker {
 if (workItemCreationLatency != null) {
   latencyAttributions.add(workItemCreationLatency);
 }
-if 
(workItemCreationEndTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
-  LOG.warn(
-  "Work item creation time {} is after the work received time {}, "
-  + "one or more GetWorkStream timing infos are missing.",
-  workItemCreationEndTime,
-  workItemLastChunkReceivedByWorkerTime);
-  return latencyAttributions;
-}
-long totalTransmissionDurationElapsedTime =
-new Duration(workItemCreationEndTime, 
workItemLastChunkReceivedByWorkerTime).getMillis();
 long totalSumDurationTimeMills = 0;
 for (SumAndMaxDurations duration : 
aggregatedGetWorkStreamLatencies.values()) {
   totalSumDurationTimeMills += duration.sum.getMillis();
 }
 final long finalTotalSumDurationTimeMills = totalSumDurationTimeMills;
-
+long totalTransmissionDurationElapsedTime;
+if 
(workItemCreationEndTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
+  LOG.debug(
+  "Work item creation time {} is after the work received time {}, "
+  + "one or more GetWorkStream timing infos are missing. Using raw 
times without scaling.",
+  workItemCreationEndTime,
+  workItemLastChunkReceivedByWorkerTime);
+  totalTransmissionDurationElapsedTime = finalTotalSumDurationTimeMills;
+} else {
+  totalTransmissionDurationElapsedTime =
+  new Duration(workItemCreationEndTime, 
workItemLastChunkReceivedByWorkerTime).getMillis();
+}
 aggregatedGetWorkStreamLatencies.forEach(
 (state, duration) -> {
   long scaledDuration =
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
index 454c616db41..fe0822a6067 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
@@ -17,11 +17,7 @@
  */
 package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 import java.io.InputStream;
 import java.io.Seque

(beam) branch master updated: Correct per-entry HashMap overhead in WindmillStateCache (#30672)

2024-04-10 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 89aeb187f2c Correct per-entry HashMap overhead in WindmillStateCache 
(#30672)
89aeb187f2c is described below

commit 89aeb187f2c9350fa51fc2b2f690c93a57e523b9
Author: dmitryor <34167644+dmitr...@users.noreply.github.com>
AuthorDate: Wed Apr 10 01:28:23 2024 -0700

Correct per-entry HashMap overhead in WindmillStateCache (#30672)
---
 .../worker/windmill/state/WindmillStateCache.java  |  3 ++-
 .../worker/windmill/state/WindmillStateCacheTest.java  | 12 ++--
 .../windmill/state/WindmillStateInternalsTest.java | 18 +-
 3 files changed, 17 insertions(+), 16 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
index 85c74fe8591..c6c49134bcb 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
@@ -64,7 +64,8 @@ public class WindmillStateCache implements StatusDataProvider 
{
   // Initial size of hash tables per entry.
   private static final int INITIAL_HASH_MAP_CAPACITY = 4;
   // Overhead of each hash map entry.
-  private static final int HASH_MAP_ENTRY_OVERHEAD = 16;
+  // https://appsintheopen.com/posts/52-the-memory-overhead-of-java-ojects
+  private static final int HASH_MAP_ENTRY_OVERHEAD = 32;
   // Overhead of each StateCacheEntry.  One long, plus a hash table.
   private static final int PER_CACHE_ENTRY_OVERHEAD =
   8 + HASH_MAP_ENTRY_OVERHEAD * INITIAL_HASH_MAP_CAPACITY;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
index 1f4355b156b..446a34f73de 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
@@ -168,15 +168,15 @@ public class WindmillStateCacheTest {
 
 assertEquals(0, cache.getWeight());
 keyCache.persist();
-assertEquals(254, cache.getWeight());
+assertEquals(414, cache.getWeight());
 
 keyCache.put(triggerNamespace(0, 0), new TestStateTag("tag3"), new 
TestState("t3"), 2);
 keyCache.put(triggerNamespace(0, 0), new TestStateTag("tag2"), new 
TestState("t2"), 2);
 
 // Observes updated weight in entries, though cache will not know about it.
-assertEquals(290, cache.getWeight());
+assertEquals(482, cache.getWeight());
 keyCache.persist();
-assertEquals(290, cache.getWeight());
+assertEquals(482, cache.getWeight());
 
 keyCache =
 cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 
2L).forFamily(STATE_FAMILY);
@@ -212,7 +212,7 @@ public class WindmillStateCacheTest {
 
 keyCache =
 cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 
2L).forFamily(STATE_FAMILY);
-assertEquals(127, cache.getWeight());
+assertEquals(207, cache.getWeight());
 assertEquals(
 Optional.of(new TestState("g1")),
 keyCache.get(StateNamespaces.global(), new TestStateTag("tag1")));
@@ -221,7 +221,7 @@ public class WindmillStateCacheTest {
 cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 1L, 
3L).forFamily(STATE_FAMILY);
 assertEquals(
 Optional.empty(), keyCache.get(StateNamespaces.global(), new 
TestStateTag("tag1")));
-assertEquals(127, cache.getWeight());
+assertEquals(207, cache.getWeight());
   }
 
   /** Verifies that the cache is invalidated when the cache token changes. */
@@ -254,7 +254,7 @@ public class WindmillStateCacheTest {
 assertEquals(Optional.of(new TestState("w2")), 
keyCache.get(windowNamespace(0), tag));
 assertEquals(0, cache.getWeight());
 keyCache.persist();
-assertEquals(127, cache.getWeight());
+assertEquals(207, cache.getWeight());
 assertEquals(Optional.of(new TestState("w2")), 
keyCache.get(windowNamespace(0), tag));
 
 // Previous work token.
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/s

(beam) branch master updated: Tune maximum thread count for streaming dataflow worker executor dynamically. (#30439)

2024-04-05 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new f437a783e19 Tune maximum thread count for streaming dataflow worker 
executor dynamically. (#30439)
f437a783e19 is described below

commit f437a783e191592917bd417495bc6317142e6a43
Author: Melody Shen <11644048+melodys...@users.noreply.github.com>
AuthorDate: Fri Apr 5 02:12:23 2024 -0700

Tune maximum thread count for streaming dataflow worker executor 
dynamically. (#30439)

Workers will read the StreamignScalingReportResponse from worker messages 
and
configure the executor pool size based on the specified value.
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   2 +-
 .../dataflow/worker/DataflowWorkUnitClient.java|  10 +-
 .../runners/dataflow/worker/WorkUnitClient.java|   4 +-
 .../harness/StreamingWorkerStatusReporter.java |  57 -
 .../dataflow/worker/util/BoundedQueueExecutor.java |  76 --
 .../worker/DataflowWorkUnitClientTest.java |  13 +-
 .../harness/StreamingWorkerStatusReporterTest.java | 100 
 .../worker/util/BoundedQueueExecutorTest.java  | 268 +
 8 files changed, 503 insertions(+), 27 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 8be8d73fbcb..d4a16325641 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -732,7 +732,7 @@ class BeamModulePlugin implements Plugin {
 google_api_common   : 
"com.google.api:api-common", // google_cloud_platform_libraries_bom sets version
 google_api_services_bigquery: 
"com.google.apis:google-api-services-bigquery:v2-rev20240124-2.0.0",  // 
[bomupgrader] sets version
 google_api_services_cloudresourcemanager: 
"com.google.apis:google-api-services-cloudresourcemanager:v1-rev20240128-2.0.0",
  // [bomupgrader] sets version
-google_api_services_dataflow: 
"com.google.apis:google-api-services-dataflow:v1b3-rev20240113-$google_clients_version",
+google_api_services_dataflow: 
"com.google.apis:google-api-services-dataflow:v1b3-rev20240218-$google_clients_version",
 google_api_services_healthcare  : 
"com.google.apis:google-api-services-healthcare:v1-rev20240130-$google_clients_version",
 google_api_services_pubsub  : 
"com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version",
 google_api_services_storage : 
"com.google.apis:google-api-services-storage:v1-rev20240205-2.0.0",  // 
[bomupgrader] sets version
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
index f3caa8d0f3a..af8e7dd50c9 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
@@ -39,6 +39,7 @@ import com.google.api.services.dataflow.model.WorkItem;
 import com.google.api.services.dataflow.model.WorkItemServiceState;
 import com.google.api.services.dataflow.model.WorkItemStatus;
 import com.google.api.services.dataflow.model.WorkerMessage;
+import com.google.api.services.dataflow.model.WorkerMessageResponse;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -312,7 +313,8 @@ class DataflowWorkUnitClient implements WorkUnitClient {
* perworkermetrics with this path.
*/
   @Override
-  public void reportWorkerMessage(List messages) throws 
IOException {
+  public List reportWorkerMessage(List 
messages)
+  throws IOException {
 SendWorkerMessagesRequest request =
 new SendWorkerMessagesRequest()
 .setLocation(options.getRegion())
@@ -327,6 +329,10 @@ class DataflowWorkUnitClient implements WorkUnitClient {
   logger.warn("Worker Message response is null");
   throw new IOException("Got null Worker Message response");
 }
-// Currently no response is expected
+if (result.getWorkerMessageResponses() == null) {
+  logger.debug("Worker Message response is empty.");
+  return Collections.emptyList();
+}
+return result.getWorkerMessageResponses();
   }
 }
diff --git 
a/runners/google-clo

(beam) branch master updated (50f33cd786d -> c1c255a0a43)

2024-03-19 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 50f33cd786d [Python] Check feature store existence at pipeline 
construction time (#30668)
 add c1c255a0a43 Cache and close windmill grpc channels (#30425)

No new revisions were added by this update.

Summary of changes:
 .../google-cloud-dataflow-java/worker/build.gradle |  12 ++
 .../dataflow/worker/StreamingDataflowWorker.java   |  11 +-
 .../worker/windmill/WindmillConnection.java|   5 +
 .../windmill/client/grpc/GrpcDispatcherClient.java |   4 +-
 .../windmill/client/grpc/GrpcWindmillServer.java   |  29 +++-
 .../client/grpc/StreamingEngineClient.java |  25 ++--
 .../windmill/client/grpc/stubs/ChannelCache.java   | 117 
 ...y.java => ChannelCachingRemoteStubFactory.java} |  38 +++---
 ...Factory.java => ChannelCachingStubFactory.java} |  24 ++--
 .../client/grpc/stubs/IsolationChannel.java|   2 +-
 .../client/grpc/stubs/WindmillChannelFactory.java  |   2 +-
 .../client/grpc/GrpcWindmillServerTest.java|   3 +-
 .../client/grpc/StreamingEngineClientTest.java |  31 ++---
 .../client/grpc/WindmillStreamSenderTest.java  |   3 +-
 .../client/grpc/stubs/ChannelCacheTest.java| 150 +
 .../windmill/testing/FakeWindmillStubFactory.java  |  27 ++--
 .../budget/EvenGetWorkBudgetDistributorTest.java   |   7 +-
 17 files changed, 403 insertions(+), 87 deletions(-)
 create mode 100644 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java
 rename 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/{RemoteWindmillStubFactory.java
 => ChannelCachingRemoteStubFactory.java} (67%)
 copy 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/{WindmillStubFactory.java
 => ChannelCachingStubFactory.java} (58%)
 create mode 100644 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java



(beam) branch master updated: Revert "Implementing lull reporting at bundle level processing (#29882)" (#30648)

2024-03-18 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 50c59912bc0 Revert "Implementing lull reporting at bundle level 
processing (#29882)" (#30648)
50c59912bc0 is described below

commit 50c59912bc002947c335170b42827b278b78aae1
Author: Arvind Ram 
AuthorDate: Mon Mar 18 03:31:49 2024 -0700

Revert "Implementing lull reporting at bundle level processing (#29882)" 
(#30648)

This reverts commit ffe2dba532028cdbbb5bca9c374f0a2d756ee8bf.
---
 .../core/metrics/ExecutionStateTracker.java|  25 
 .../dataflow/worker/DataflowExecutionContext.java  | 117 +
 .../dataflow/worker/DataflowOperationContext.java  |  80 +++-
 .../runners/dataflow/worker/StackTraceUtil.java|  66 --
 .../worker/DataflowExecutionStateTrackerTest.java  | 140 +
 .../worker/DataflowOperationContextTest.java   |  80 ++--
 6 files changed, 155 insertions(+), 353 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
index b0b8f0107f3..dc6fd2f8248 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
@@ -46,7 +46,6 @@ public class ExecutionStateTracker implements 
Comparable
   new ConcurrentHashMap<>();
 
   private static final long LULL_REPORT_MS = TimeUnit.MINUTES.toMillis(5);
-  private static final long BUNDLE_LULL_REPORT_MS = 
TimeUnit.MINUTES.toMillis(10);
   private static final AtomicIntegerFieldUpdater 
SAMPLING_UPDATER =
   AtomicIntegerFieldUpdater.newUpdater(ExecutionStateTracker.class, 
"sampling");
 
@@ -140,17 +139,8 @@ public class ExecutionStateTracker implements 
Comparable
*/
   private volatile long millisSinceLastTransition = 0;
 
-  /**
-   * The number of milliseconds since the {@link ExecutionStateTracker} 
initial state.
-   *
-   * This variable is updated by the Sampling thread, and read by the 
Progress Reporting thread,
-   * thus it being marked volatile.
-   */
-  private volatile long millisSinceBundleStart = 0;
-
   private long transitionsAtLastSample = 0;
   private long nextLullReportMs = LULL_REPORT_MS;
-  private long nextBundleLullReportMs = BUNDLE_LULL_REPORT_MS;
 
   public ExecutionStateTracker(ExecutionStateSampler sampler) {
 this.sampler = sampler;
@@ -165,10 +155,8 @@ public class ExecutionStateTracker implements 
Comparable
 currentState = null;
 numTransitions = 0;
 millisSinceLastTransition = 0;
-millisSinceBundleStart = 0;
 transitionsAtLastSample = 0;
 nextLullReportMs = LULL_REPORT_MS;
-nextBundleLullReportMs = BUNDLE_LULL_REPORT_MS;
   }
 
   @VisibleForTesting
@@ -347,19 +335,6 @@ public class ExecutionStateTracker implements 
Comparable
   transitionsAtLastSample = transitionsAtThisSample;
 }
 updateMillisSinceLastTransition(millisSinceLastSample, state);
-updateMillisSinceBundleStart(millisSinceLastSample);
-  }
-
-  // Override this to implement bundle level lull reporting.
-  protected void reportBundleLull(long millisSinceBundleStart) {}
-
-  @SuppressWarnings("NonAtomicVolatileUpdate")
-  private void updateMillisSinceBundleStart(long millisSinceLastSample) {
-millisSinceBundleStart += millisSinceLastSample;
-if (millisSinceBundleStart > nextBundleLullReportMs) {
-  reportBundleLull(millisSinceBundleStart);
-  nextBundleLullReportMs += BUNDLE_LULL_REPORT_MS;
-}
   }
 
   @SuppressWarnings("NonAtomicVolatileUpdate")
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
index 16ff2975b02..080fa7c9dac 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.dataflow.worker;
 
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
 
-import com.google.api.client.util.Clock;
 import com.google.api.services.dataflow.model.SideInputInfo;
 import java.io.Closeable;
 import java.io.IOException;
@@ -30,8 +29,6 @@ import java.util.IntSummaryStatistics;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import j

(beam) branch master updated: Refactor commit logic out of StreamingDataflowWorker (#30312)

2024-03-15 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new c298da550f8 Refactor commit logic out of StreamingDataflowWorker 
(#30312)
c298da550f8 is described below

commit c298da550f88b1fd489bc25c08a40d484833e428
Author: martin trieu 
AuthorDate: Fri Mar 15 02:01:18 2024 -0700

Refactor commit logic out of StreamingDataflowWorker (#30312)
---
 .../dataflow/worker/StreamingDataflowWorker.java   | 237 +++-
 .../dataflow/worker/WindmillComputationKey.java|   5 +
 .../client/CloseableStream.java}   |  30 +-
 .../worker/windmill/client/WindmillStreamPool.java |   7 +
 .../client/commits}/Commit.java|  10 +-
 .../windmill/client/commits/CompleteCommit.java|  67 +
 .../commits/StreamingApplianceWorkCommitter.java   | 167 +++
 .../commits/StreamingEngineWorkCommitter.java  | 233 
 .../windmill/client/commits/WorkCommitter.java |  54 
 .../worker/windmill/state/WindmillStateCache.java  |   5 +
 .../dataflow/worker/FakeWindmillServer.java|  32 ++-
 .../worker/StreamingDataflowWorkerTest.java|   2 +-
 .../StreamingApplianceWorkCommitterTest.java   | 140 ++
 .../commits/StreamingEngineWorkCommitterTest.java  | 308 +
 14 files changed, 1077 insertions(+), 220 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 6f1bb0847bc..4c3ffd08a0b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -87,17 +87,14 @@ import 
org.apache.beam.runners.dataflow.worker.status.DebugCapture.Capturable;
 import 
org.apache.beam.runners.dataflow.worker.status.LastExceptionDataProvider;
 import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
 import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages;
-import org.apache.beam.runners.dataflow.worker.streaming.Commit;
 import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
 import org.apache.beam.runners.dataflow.worker.streaming.ExecutionState;
 import 
org.apache.beam.runners.dataflow.worker.streaming.KeyCommitTooLargeException;
 import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey;
 import org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
-import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue;
 import org.apache.beam.runners.dataflow.worker.streaming.Work;
 import org.apache.beam.runners.dataflow.worker.streaming.Work.State;
 import 
org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor;
-import org.apache.beam.runners.dataflow.worker.streaming.WorkId;
 import 
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
 import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
 import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
@@ -110,9 +107,13 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribut
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
 import 
org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApplianceServer;
-import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
+import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commit;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.commits.CompleteCommit;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.commits.StreamingApplianceWorkCommitter;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.commits.StreamingEngineWorkCommitter;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.ChannelzServlet;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
@@ -217,9 +218,6 @@ public class StreamingDataflowWorker {
   final WindmillStateCache stateCache;
   // Maps from computation ids to per

(beam) branch master updated: Fix flaky streaming dataflow tests (#30572)

2024-03-14 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 14d25c3da76 Fix flaky streaming dataflow tests (#30572)
14d25c3da76 is described below

commit 14d25c3da76e3864e746a1a13e0896e861fab607
Author: martin trieu 
AuthorDate: Thu Mar 14 02:16:39 2024 -0700

Fix flaky streaming dataflow tests (#30572)

* remove waiting/sleeping arbitratily in tests since it is leading to 
flakiness
---
 .../dataflow/worker/StreamingDataflowWorker.java   |  4 +-
 .../worker/windmill/state/WindmillStateCache.java  | 30 +---
 .../worker/StreamingModeExecutionContextTest.java  |  2 +-
 .../dataflow/worker/WorkerCustomSourcesTest.java   |  3 +-
 .../windmill/state/WindmillStateCacheTest.java |  2 +-
 .../windmill/state/WindmillStateInternalsTest.java |  2 +-
 .../work/budget/GetWorkBudgetRefresherTest.java| 84 ++
 .../refresh/DispatchedActiveWorkRefresherTest.java | 31 +---
 8 files changed, 101 insertions(+), 57 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 7bc186af445..6f1bb0847bc 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -476,7 +476,7 @@ public class StreamingDataflowWorker {
 computationId -> 
Optional.ofNullable(computationMap.get(computationId,
 clientId,
 computationMap,
-new WindmillStateCache(options.getWorkerCacheMb()),
+WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb()),
 createWorkUnitExecutor(options),
 IntrinsicMapTaskExecutorFactory.defaultFactory(),
 new DataflowWorkUnitClient(options, LOG),
@@ -502,7 +502,7 @@ public class StreamingDataflowWorker {
   Supplier clock,
   Function executorSupplier) {
 BoundedQueueExecutor boundedQueueExecutor = 
createWorkUnitExecutor(options);
-WindmillStateCache stateCache = new 
WindmillStateCache(options.getWorkerCacheMb());
+WindmillStateCache stateCache = 
WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb());
 computationMap.putAll(
 createComputationMapForTesting(mapTasks, boundedQueueExecutor, 
stateCache::forComputation));
 return new StreamingDataflowWorker(
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
index ba59d1ae814..0d4e7c6b645 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
@@ -42,7 +42,6 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Precondit
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Weigher;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.MapMaker;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
@@ -56,6 +55,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
  * thread at a time, so this is safe.
  */
 public class WindmillStateCache implements StatusDataProvider {
+  private static final int STATE_CACHE_CONCURRENCY_LEVEL = 4;
   // Convert Megabytes to bytes
   private static final long MEGABYTES = 1024 * 1024;
   // Estimate of overhead per StateId.
@@ -72,20 +72,28 @@ public class WindmillStateCache implements 
StatusDataProvider {
   // Contains the current valid ForKey object. Entries in the cache are keyed 
by ForKey with pointer
   // equality so entries may be invalidated by creating a new key object, 
rendering the previous
   // entries inaccessible. They will be evicted through normal cache operation.
-  private final ConcurrentMap keyIndex =
-  new MapMaker().weakValues().concurrencyLevel(4).makeMap();
+  private final ConcurrentMap keyIndex;
   private final long workerCacheBytes; // Copy workerCacheMb and convert to 
bytes.
 
-  public WindmillStateCache(long workerCach

(beam) branch master updated: Add last error time to stream error message (#30476)

2024-03-04 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 4ad8d530916 Add last error time to stream error message (#30476)
4ad8d530916 is described below

commit 4ad8d530916a590e91d8b091291cfa2eaefec029
Author: Arun Pandian 
AuthorDate: Mon Mar 4 02:20:52 2024 -0800

Add last error time to stream error message (#30476)
---
 .../worker/windmill/client/AbstractWindmillStream.java | 18 +++---
 1 file changed, 15 insertions(+), 3 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
index baafc22e030..028a5c2e1d4 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
@@ -37,6 +37,7 @@ import 
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.StatusRuntimeException;
 import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTime;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,6 +62,7 @@ import org.slf4j.LoggerFactory;
  * synchronizing on this.
  */
 public abstract class AbstractWindmillStream implements 
WindmillStream {
+
   public static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
   // Default gRPC streams to 2MB chunks, which has shown to be a large enough 
chunk size to reduce
   // per-chunk overhead, and small enough that we can still perform granular 
flow-control.
@@ -74,6 +76,7 @@ public abstract class AbstractWindmillStream implements Win
   private final AtomicLong lastResponseTimeMs;
   private final AtomicInteger errorCount;
   private final AtomicReference lastError;
+  private final AtomicReference lastErrorTime;
   private final AtomicLong sleepUntil;
   private final CountDownLatch finishLatch;
   private final Set> streamRegistry;
@@ -105,6 +108,7 @@ public abstract class AbstractWindmillStream implements Win
 this.lastResponseTimeMs = new AtomicLong();
 this.errorCount = new AtomicInteger();
 this.lastError = new AtomicReference<>();
+this.lastErrorTime = new AtomicReference<>();
 this.sleepUntil = new AtomicLong();
 this.finishLatch = new CountDownLatch(1);
 this.requestObserverSupplier =
@@ -210,7 +214,9 @@ public abstract class AbstractWindmillStream implements Win
   public final void appendSummaryHtml(PrintWriter writer) {
 appendSpecificHtml(writer);
 if (errorCount.get() > 0) {
-  writer.format(", %d errors, last error [ %s ]", errorCount.get(), 
lastError.get());
+  writer.format(
+  ", %d errors, last error [ %s ] at [%s]",
+  errorCount.get(), lastError.get(), lastErrorTime.get());
 }
 if (clientClosed.get()) {
   writer.write(", client closed");
@@ -250,6 +256,7 @@ public abstract class AbstractWindmillStream implements Win
   }
 
   private class ResponseObserver implements StreamObserver {
+
 @Override
 public void onNext(ResponseT response) {
   try {
@@ -285,7 +292,7 @@ public abstract class AbstractWindmillStream implements Win
   status = ((StatusRuntimeException) t).getStatus();
 }
 String statusError = status == null ? "" : status.toString();
-lastError.set(statusError);
+setLastError(statusError);
 if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) {
   long nowMillis = Instant.now().getMillis();
   String responseDebug;
@@ -325,9 +332,14 @@ public abstract class AbstractWindmillStream implements Win
 "Stream completed successfully but did not complete requested 
operations, "
 + "recreating";
 LOG.warn(error);
-lastError.set(error);
+setLastError(error);
   }
   executor.execute(AbstractWindmillStream.this::startStream);
 }
   }
+
+  private void setLastError(String error) {
+lastError.set(error);
+lastErrorTime.set(DateTime.now());
+  }
 }



(beam) branch master updated: Make windmill service stream max backoff configurable (#30475)

2024-03-04 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 770c6fe39cc Make windmill service stream max backoff configurable 
(#30475)
770c6fe39cc is described below

commit 770c6fe39cc90b19a0087d42dc9bb39c8e614cc6
Author: Arun Pandian 
AuthorDate: Mon Mar 4 02:20:29 2024 -0800

Make windmill service stream max backoff configurable (#30475)
---
 .../runners/dataflow/options/DataflowStreamingPipelineOptions.java  | 6 ++
 .../beam/runners/dataflow/worker/StreamingDataflowWorker.java   | 2 +-
 .../dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java| 3 +--
 3 files changed, 8 insertions(+), 3 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
index d02155d8ce3..e8396c02726 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
@@ -205,6 +205,12 @@ public interface DataflowStreamingPipelineOptions extends 
PipelineOptions {
 
   void setWindmillServiceRpcChannelAliveTimeoutSec(int value);
 
+  @Description("Max backoff with which the windmill service stream failures 
are retried")
+  @Default.Integer(30 * 1000) // 30s
+  int getWindmillServiceStreamMaxBackoffMillis();
+
+  void setWindmillServiceStreamMaxBackoffMillis(int value);
+
   /**
* Factory for creating local Windmill address. Reads from system propery 
'windmill.hostport' for
* backwards compatibility.
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 6df500eed3e..7bc186af445 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -614,7 +614,7 @@ public class StreamingDataflowWorker {
 Duration maxBackoff =
 !options.isEnableStreamingEngine() && 
options.getLocalWindmillHostport() != null
 ? GrpcWindmillServer.LOCALHOST_MAX_BACKOFF
-: GrpcWindmillServer.MAX_BACKOFF;
+: 
Duration.millis(options.getWindmillServiceStreamMaxBackoffMillis());
 GrpcWindmillStreamFactory windmillStreamFactory =
 GrpcWindmillStreamFactory.of(
 JobHeader.newBuilder()
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
index d6944b36034..b09e341f29e 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
@@ -88,7 +88,6 @@ import org.slf4j.LoggerFactory;
 @SuppressWarnings("nullness") // 
TODO(https://github.com/apache/beam/issues/20497
 public final class GrpcWindmillServer extends WindmillServerStub {
   public static final Duration LOCALHOST_MAX_BACKOFF = Duration.millis(500);
-  public static final Duration MAX_BACKOFF = Duration.standardSeconds(30);
   private static final Duration MIN_BACKOFF = Duration.millis(1);
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcWindmillServer.class);
   private static final int DEFAULT_LOG_EVERY_N_FAILURES = 20;
@@ -113,7 +112,7 @@ public final class GrpcWindmillServer extends 
WindmillServerStub {
   Consumer> 
processHeartbeatResponses) {
 this.options = options;
 this.throttleTimers = StreamingEngineThrottleTimers.create();
-this.maxBackoff = MAX_BACKOFF;
+this.maxBackoff = 
Duration.millis(options.getWindmillServiceStreamMaxBackoffMillis());
 this.dispatcherClient = grpcDispatcherClient;
 this.syncApplianceStub = null;
 this.sendKeyedGetDataRequests =



(beam) branch master updated (935ca9c58d6 -> be1526b7b57)

2024-03-04 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 935ca9c58d6 fix alerting path for grafana provisioning (#30464)
 add be1526b7b57 Register streaming specific options with 
DataflowPipelineRegistrar (#30474)

No new revisions were added by this update.

Summary of changes:
 .../beam/runners/dataflow/DataflowRunner.java  |   3 +-
 .../options/DataflowPipelineDebugOptions.java  | 111 -
 .../dataflow/options/DataflowPipelineOptions.java  |   1 +
 .../options/DataflowStreamingPipelineOptions.java} | 134 +++--
 .../dataflow/worker/StreamingDataflowWorker.java   |  25 ++--
 .../worker/streaming/sideinput/SideInputCache.java |   4 +-
 .../streaming/sideinput/SideInputStateFetcher.java |   4 +-
 .../windmill/client/grpc/ChannelzServlet.java  |   6 +-
 .../windmill/client/grpc/GrpcWindmillServer.java   |  20 +--
 .../worker/DataflowWorkerHarnessHelperTest.java|   9 +-
 .../worker/StreamingDataflowWorkerTest.java|  40 +++---
 .../dataflow/worker/status/DebugCaptureTest.java   |   8 +-
 .../sideinput/SideInputStateFetcherTest.java   |   8 +-
 .../windmill/client/grpc/ChannelzServletTest.java  |  10 +-
 14 files changed, 192 insertions(+), 191 deletions(-)
 rename 
runners/google-cloud-dataflow-java/{worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java
 => 
src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java}
 (56%)



(beam) branch master updated (2ae4a28e6fc -> 6c3e8ad1af1)

2024-02-27 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 2ae4a28e6fc Force downgrade mpmath to avoid test breakages (#30418)
 add 6c3e8ad1af1 [Dataflow Streaming] Start to refactor persistence layer 
to prepare for direct path (#30265)

No new revisions were added by this update.

Summary of changes:
 .../dataflow/worker/StreamingDataflowWorker.java   | 278 -
 .../streaming/sideinput/SideInputStateFetcher.java |  23 +-
 .../worker/windmill/state/WindmillStateReader.java |  31 +--
 .../worker/StreamingDataflowWorkerTest.java|   6 +-
 .../sideinput/SideInputStateFetcherTest.java   |  14 +-
 .../windmill/state/WindmillStateReaderTest.java|   8 +-
 6 files changed, 195 insertions(+), 165 deletions(-)



(beam) branch master updated: Remove some uses of ClassLoadingStrategy.Default.INJECTION (#30367)

2024-02-26 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 6406cfe3c9b Remove some uses of ClassLoadingStrategy.Default.INJECTION 
(#30367)
6406cfe3c9b is described below

commit 6406cfe3c9b613e3a5aef334c458d7e20c5502c2
Author: Sam Whittle 
AuthorDate: Mon Feb 26 21:34:27 2024 +0100

Remove some uses of ClassLoadingStrategy.Default.INJECTION (#30367)
---
 .../java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java | 10 +-
 .../main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java |  3 +--
 .../sdk/extensions/avro/schemas/utils/AvroByteBuddyUtils.java  |  5 +++--
 .../org/apache/beam/sdk/io/aws2/schemas/AwsSchemaUtils.java|  4 ++--
 4 files changed, 11 insertions(+), 11 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java
index 769b2287941..dcbbf70888d 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.schemas.utils;
 
+import static org.apache.beam.sdk.util.ByteBuddyUtils.getClassLoadingStrategy;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
 
 import java.lang.reflect.Constructor;
@@ -32,11 +33,10 @@ import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import net.bytebuddy.ByteBuddy;
-import net.bytebuddy.asm.AsmVisitorWrapper;
+import net.bytebuddy.asm.AsmVisitorWrapper.ForDeclaredMethods;
 import net.bytebuddy.description.method.MethodDescription.ForLoadedMethod;
 import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
 import net.bytebuddy.dynamic.DynamicType;
-import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.dynamic.scaffold.InstrumentedType;
 import net.bytebuddy.implementation.Implementation;
 import net.bytebuddy.implementation.bytecode.ByteCodeAppender;
@@ -227,7 +227,7 @@ public class AutoValueUtils {
 
   private static final ByteBuddy BYTE_BUDDY = new ByteBuddy();
 
-  static SchemaUserTypeCreator createBuilderCreator(
+  private static SchemaUserTypeCreator createBuilderCreator(
   Class builderClass,
   List setterMethods,
   Method buildMethod,
@@ -242,9 +242,9 @@ public class AutoValueUtils {
   .intercept(
   new BuilderCreateInstruction(types, setterMethods, 
builderClass, buildMethod));
   return builder
-  .visit(new 
AsmVisitorWrapper.ForDeclaredMethods().writerFlags(ClassWriter.COMPUTE_FRAMES))
+  .visit(new 
ForDeclaredMethods().writerFlags(ClassWriter.COMPUTE_FRAMES))
   .make()
-  .load(ReflectHelpers.findClassLoader(), 
ClassLoadingStrategy.Default.INJECTION)
+  .load(ReflectHelpers.findClassLoader(), 
getClassLoadingStrategy(builderClass))
   .getLoaded()
   .getDeclaredConstructor()
   .newInstance();
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
index 66b0a591057..93875a20707 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
@@ -32,7 +32,6 @@ import net.bytebuddy.asm.AsmVisitorWrapper;
 import net.bytebuddy.description.field.FieldDescription.ForLoadedField;
 import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
 import net.bytebuddy.dynamic.DynamicType;
-import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.dynamic.scaffold.InstrumentedType;
 import net.bytebuddy.implementation.FixedValue;
 import net.bytebuddy.implementation.Implementation;
@@ -253,7 +252,7 @@ public class POJOUtils {
   return builder
   .visit(new 
AsmVisitorWrapper.ForDeclaredMethods().writerFlags(ClassWriter.COMPUTE_FRAMES))
   .make()
-  .load(ReflectHelpers.findClassLoader(), 
ClassLoadingStrategy.Default.INJECTION)
+  .load(ReflectHelpers.findClassLoader(), 
getClassLoadingStrategy(clazz))
   .getLoaded()
   .getDeclaredConstructor()
   .newInstance();
diff --git 
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroByteBuddyUtils.java
 
b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroByteBuddyUtils.java
index e07f6ffb468..0a82663c177 100644
--- 
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroByteBuddyUtils.java

(beam) branch master updated: Implementing lull reporting at bundle level processing (#29882)

2024-02-26 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new ffe2dba5320 Implementing lull reporting at bundle level processing 
(#29882)
ffe2dba5320 is described below

commit ffe2dba532028cdbbb5bca9c374f0a2d756ee8bf
Author: Arvind Ram 
AuthorDate: Mon Feb 26 12:23:45 2024 -0800

Implementing lull reporting at bundle level processing (#29882)
---
 .../core/metrics/ExecutionStateTracker.java|  25 
 .../dataflow/worker/DataflowExecutionContext.java  | 117 -
 .../dataflow/worker/DataflowOperationContext.java  |  80 +---
 .../runners/dataflow/worker/StackTraceUtil.java|  66 ++
 .../worker/DataflowExecutionStateTrackerTest.java  | 140 -
 .../worker/DataflowOperationContextTest.java   |  80 ++--
 6 files changed, 353 insertions(+), 155 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
index dc6fd2f8248..b0b8f0107f3 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
@@ -46,6 +46,7 @@ public class ExecutionStateTracker implements 
Comparable
   new ConcurrentHashMap<>();
 
   private static final long LULL_REPORT_MS = TimeUnit.MINUTES.toMillis(5);
+  private static final long BUNDLE_LULL_REPORT_MS = 
TimeUnit.MINUTES.toMillis(10);
   private static final AtomicIntegerFieldUpdater 
SAMPLING_UPDATER =
   AtomicIntegerFieldUpdater.newUpdater(ExecutionStateTracker.class, 
"sampling");
 
@@ -139,8 +140,17 @@ public class ExecutionStateTracker implements 
Comparable
*/
   private volatile long millisSinceLastTransition = 0;
 
+  /**
+   * The number of milliseconds since the {@link ExecutionStateTracker} 
initial state.
+   *
+   * This variable is updated by the Sampling thread, and read by the 
Progress Reporting thread,
+   * thus it being marked volatile.
+   */
+  private volatile long millisSinceBundleStart = 0;
+
   private long transitionsAtLastSample = 0;
   private long nextLullReportMs = LULL_REPORT_MS;
+  private long nextBundleLullReportMs = BUNDLE_LULL_REPORT_MS;
 
   public ExecutionStateTracker(ExecutionStateSampler sampler) {
 this.sampler = sampler;
@@ -155,8 +165,10 @@ public class ExecutionStateTracker implements 
Comparable
 currentState = null;
 numTransitions = 0;
 millisSinceLastTransition = 0;
+millisSinceBundleStart = 0;
 transitionsAtLastSample = 0;
 nextLullReportMs = LULL_REPORT_MS;
+nextBundleLullReportMs = BUNDLE_LULL_REPORT_MS;
   }
 
   @VisibleForTesting
@@ -335,6 +347,19 @@ public class ExecutionStateTracker implements 
Comparable
   transitionsAtLastSample = transitionsAtThisSample;
 }
 updateMillisSinceLastTransition(millisSinceLastSample, state);
+updateMillisSinceBundleStart(millisSinceLastSample);
+  }
+
+  // Override this to implement bundle level lull reporting.
+  protected void reportBundleLull(long millisSinceBundleStart) {}
+
+  @SuppressWarnings("NonAtomicVolatileUpdate")
+  private void updateMillisSinceBundleStart(long millisSinceLastSample) {
+millisSinceBundleStart += millisSinceLastSample;
+if (millisSinceBundleStart > nextBundleLullReportMs) {
+  reportBundleLull(millisSinceBundleStart);
+  nextBundleLullReportMs += BUNDLE_LULL_REPORT_MS;
+}
   }
 
   @SuppressWarnings("NonAtomicVolatileUpdate")
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
index 080fa7c9dac..16ff2975b02 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.worker;
 
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.api.client.util.Clock;
 import com.google.api.services.dataflow.model.SideInputInfo;
 import java.io.Closeable;
 import java.io.IOException;
@@ -29,6 +30,8 @@ import java.util.IntSummaryStatistics;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.logging.Level;
+import java.util.logging.LogRecord;
 import java.util.stream.Collectors;
 import javax.a

(beam) branch master updated (67f930048d3 -> b78a3e1790f)

2024-02-21 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 67f930048d3 Bump github.com/docker/docker in /sdks (#30308)
 add b78a3e1790f fix flaky StreamingEngineClientTest (#30322)

No new revisions were added by this update.

Summary of changes:
 .../client/grpc/StreamingEngineClient.java |  19 ++--
 .../client/grpc/StreamingEngineClientTest.java | 115 +
 2 files changed, 78 insertions(+), 56 deletions(-)



(beam) branch master updated: [Dataflow Streaming] Add Channelz staus page exporting GRPC channelz data (#30211)

2024-02-16 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 3693174c042 [Dataflow Streaming] Add Channelz staus page exporting 
GRPC channelz data (#30211)
3693174c042 is described below

commit 3693174c0421d0ff049042ca283db633431892ef
Author: Arun Pandian 
AuthorDate: Fri Feb 16 03:51:01 2024 -0800

[Dataflow Streaming] Add Channelz staus page exporting GRPC channelz data 
(#30211)

Co-authored-by: Arun Pandian 
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   2 +-
 .../options/DataflowPipelineDebugOptions.java  |   6 +
 .../dataflow/worker/StreamingDataflowWorker.java   |  10 +
 .../worker/windmill/WindmillServerBase.java|   6 +
 .../worker/windmill/WindmillServerStub.java|   6 +
 .../windmill/client/grpc/ChannelzServlet.java  | 292 +
 .../windmill/client/grpc/GrpcDispatcherClient.java |   4 +
 .../windmill/client/grpc/GrpcWindmillServer.java   |   5 +
 .../client/grpc/stubs/WindmillChannelFactory.java  |   2 +
 .../dataflow/worker/FakeWindmillServer.java|  22 +-
 .../windmill/client/grpc/ChannelzServletTest.java  | 104 
 11 files changed, 454 insertions(+), 5 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index fe80b826c56..2376a2c9bbc 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -906,7 +906,7 @@ class BeamModulePlugin implements Plugin {
 testcontainers_rabbitmq : 
"org.testcontainers:rabbitmq:$testcontainers_version",
 truth   : 
"com.google.truth:truth:1.1.5",
 threetenbp  : 
"org.threeten:threetenbp:1.6.8",
-vendored_grpc_1_60_1: 
"org.apache.beam:beam-vendor-grpc-1_60_1:0.1",
+vendored_grpc_1_60_1: 
"org.apache.beam:beam-vendor-grpc-1_60_1:0.2",
 vendored_guava_32_1_2_jre   : 
"org.apache.beam:beam-vendor-guava-32_1_2-jre:0.1",
 vendored_calcite_1_28_0 : 
"org.apache.beam:beam-vendor-calcite-1_28_0:0.2",
 woodstox_core_asl   : 
"org.codehaus.woodstox:woodstox-core-asl:4.4.1",
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
index 888b0d3f0b6..9b06fa9b7e2 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
@@ -311,6 +311,12 @@ public interface DataflowPipelineDebugOptions
 
   void setWindmillGetDataStreamCount(int value);
 
+  @Description("If true, will only show windmill service channels on 
/channelz")
+  @Default.Boolean(true)
+  boolean getChannelzShowOnlyWindmillServiceChannels();
+
+  void setChannelzShowOnlyWindmillServiceChannels(boolean value);
+
   /**
* The amount of time before UnboundedReaders are considered idle and closed 
during streaming
* execution.
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 825c3fb78c7..2e0156bae77 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -114,6 +114,7 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApp
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.ChannelzServlet;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
 import 
org.apache.beam.r

(beam) branch master updated: Update windmill proto definition (#30046)

2024-02-15 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 690a5a44605 Update windmill proto definition (#30046)
690a5a44605 is described below

commit 690a5a446054281f72fa4c141c8e90e296663c70
Author: martin trieu 
AuthorDate: Thu Feb 15 11:56:46 2024 -0800

Update windmill proto definition (#30046)

* make external and internal windmill proto defs identical
* override authority of grpc channel for direct path
* Do not inject WindmillServer from deps, create in StreamingDataflowWorker
---
 .../dataflow/worker/StreamingDataflowWorker.java   |  96 +++
 .../options/StreamingDataflowWorkerOptions.java|  33 
 .../streaming/WorkHeartbeatResponseProcessor.java  |  68 
 .../worker/windmill/WindmillEndpoints.java | 178 -
 .../worker/windmill/WindmillServerStub.java|   6 -
 .../worker/windmill/WindmillServiceAddress.java|  28 +++-
 .../windmill/client/grpc/GrpcDispatcherClient.java | 178 +++--
 .../windmill/client/grpc/GrpcWindmillServer.java   | 110 +++--
 .../client/grpc/GrpcWindmillStreamFactory.java |  17 +-
 .../client/grpc/StreamingEngineClient.java |  23 +--
 .../grpc/stubs/RemoteWindmillStubFactory.java  |  76 +
 .../client/grpc/stubs/WindmillChannelFactory.java  |  40 -
 .../client/grpc/stubs/WindmillStubFactory.java |  62 +--
 .../dataflow/worker/FakeWindmillServer.java|  19 ++-
 .../worker/StreamingDataflowWorkerTest.java| 161 ---
 .../grpc/GrpcGetWorkerMetadataStreamTest.java  |  30 ++--
 .../client/grpc/GrpcWindmillServerTest.java|  29 +++-
 .../client/grpc/StreamingEngineClientTest.java |  48 --
 .../windmill/testing/FakeWindmillStubFactory.java  |  47 ++
 .../worker/windmill/src/main/proto/windmill.proto  |  12 +-
 .../windmill/src/main/proto/windmill_service.proto |  10 +-
 21 files changed, 806 insertions(+), 465 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 06450e60fc0..825c3fb78c7 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -57,6 +57,7 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import javax.servlet.http.HttpServletRequest;
@@ -96,6 +97,7 @@ import 
org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
 import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue;
 import org.apache.beam.runners.dataflow.worker.streaming.Work;
 import org.apache.beam.runners.dataflow.worker.streaming.Work.State;
+import 
org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor;
 import org.apache.beam.runners.dataflow.worker.streaming.WorkId;
 import 
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
 import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
@@ -104,13 +106,16 @@ import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
-import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationHeartbeatResponse;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
+import 
org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApplianceServer;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer;
+import

(beam) branch master updated (6377cbe20cc -> 5c6557d114a)

2024-02-15 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 6377cbe20cc [Docs] Add example notebook for enrichment transform with 
bigtable (#30315)
 add 5c6557d114a [Dataflow Streaming] Move a few options from 
StreamingDataflowWorkerOptions to DataflowPipelineDebugOptions (#30326)

No new revisions were added by this update.

Summary of changes:
 .../options/DataflowPipelineDebugOptions.java  | 45 ++
 .../options/StreamingDataflowWorkerOptions.java| 45 --
 2 files changed, 45 insertions(+), 45 deletions(-)



(beam) branch master updated: check for cachetoken representing a retry before activating and completing work (#29082)

2024-02-13 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 514b03bebcc check for cachetoken representing a retry before 
activating and completing work (#29082)
514b03bebcc is described below

commit 514b03bebcc1addf39d2250c09c6aa42ee68b3db
Author: martin trieu 
AuthorDate: Tue Feb 13 04:41:41 2024 -0800

check for cachetoken representing a retry before activating and completing 
work (#29082)
---
 .../dataflow/worker/StreamingDataflowWorker.java   |  40 ++--
 .../dataflow/worker/streaming/ActiveWorkState.java | 161 +++
 .../worker/streaming/ComputationState.java |  14 +-
 .../runners/dataflow/worker/streaming/Work.java|  10 +
 .../runners/dataflow/worker/streaming/WorkId.java  |  48 +
 .../worker/StreamingDataflowWorkerTest.java|  94 +++--
 .../worker/streaming/ActiveWorkStateTest.java  | 216 -
 7 files changed, 418 insertions(+), 165 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 2f9e18cde67..4d2ef6a03cf 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -87,7 +87,6 @@ import 
org.apache.beam.runners.dataflow.worker.status.DebugCapture.Capturable;
 import 
org.apache.beam.runners.dataflow.worker.status.LastExceptionDataProvider;
 import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
 import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages;
-import 
org.apache.beam.runners.dataflow.worker.streaming.ActiveWorkState.FailedTokens;
 import org.apache.beam.runners.dataflow.worker.streaming.Commit;
 import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
 import org.apache.beam.runners.dataflow.worker.streaming.ExecutionState;
@@ -97,6 +96,7 @@ import 
org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
 import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue;
 import org.apache.beam.runners.dataflow.worker.streaming.Work;
 import org.apache.beam.runners.dataflow.worker.streaming.Work.State;
+import org.apache.beam.runners.dataflow.worker.streaming.WorkId;
 import 
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
 import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
 import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
@@ -104,6 +104,7 @@ import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationHeartbeatResponse;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
@@ -1311,7 +1312,7 @@ public class StreamingDataflowWorker {
 // Consider the item invalid. It will eventually be retried by 
Windmill if it still needs to
 // be processed.
 computationState.completeWorkAndScheduleNextWorkForKey(
-ShardedKey.create(key, workItem.getShardingKey()), 
workItem.getWorkToken());
+ShardedKey.create(key, workItem.getShardingKey()), work.id());
   }
 } finally {
   // Update total processing time counters. Updating in finally clause 
ensures that
@@ -1389,7 +1390,10 @@ public class StreamingDataflowWorker {
 for (Windmill.WorkItemCommitRequest workRequest : 
entry.getValue().getRequestsList()) {
   computationState.completeWorkAndScheduleNextWorkForKey(
   ShardedKey.create(workRequest.getKey(), 
workRequest.getShardingKey()),
-  workRequest.getWorkToken());
+  WorkId.builder()
+  .setCacheToken(workRequest.getCacheToken())
+  .setWorkToken(workRequest.getWorkToken())
+  .build());
 }
   }
 }
@@ -1409,7 +1413,11 @@ public class StreamingDataflowWorker {
   .forComputation(state.getComputationId())
   .invalidate(request.getKey(), request.getShardingKey());
   state.completeWorkAndScheduleNextWorkForKey

(beam) branch master updated: fix bug in getProcessingTimesByStepCopy (#30270)

2024-02-13 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new b923a67369e fix bug in getProcessingTimesByStepCopy (#30270)
b923a67369e is described below

commit b923a67369e0707575f974aa4cba700091aeb916
Author: clmccart 
AuthorDate: Tue Feb 13 01:22:11 2024 -0800

fix bug in getProcessingTimesByStepCopy (#30270)


Co-authored-by: Claire McCarthy 
---
 .../dataflow/worker/DataflowExecutionContext.java  | 42 +++---
 1 file changed, 29 insertions(+), 13 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
index 7d45295b2d8..080fa7c9dac 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
@@ -29,6 +29,8 @@ import java.util.IntSummaryStatistics;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
+import javax.annotation.concurrent.GuardedBy;
 import org.apache.beam.runners.core.NullSideInputReader;
 import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.runners.core.StepContext;
@@ -177,6 +179,7 @@ public abstract class DataflowExecutionContext {
 
   /** Dataflow specific {@link StepContext}. */
   public abstract static class DataflowStepContext implements StepContext {
+
 private final NameContext nameContext;
 
 public DataflowStepContext(NameContext nameContext) {
@@ -253,10 +256,13 @@ public abstract class DataflowExecutionContext {
  * Metadata on the message whose processing is currently being managed by 
this tracker. If no
  * message is actively being processed, activeMessageMetadata will be null.
  */
-@Nullable private ActiveMessageMetadata activeMessageMetadata = null;
+@GuardedBy("this")
+@Nullable
+private ActiveMessageMetadata activeMessageMetadata = null;
 
 private final MillisProvider clock = System::currentTimeMillis;
 
+@GuardedBy("this")
 private final Map processingTimesByStep = 
new HashMap<>();
 
 public DataflowExecutionStateTracker(
@@ -313,20 +319,19 @@ public abstract class DataflowExecutionContext {
   if (isDataflowProcessElementState) {
 DataflowExecutionState newDFState = (DataflowExecutionState) newState;
 if (newDFState.getStepName() != null && 
newDFState.getStepName().userName() != null) {
-  if (this.activeMessageMetadata != null) {
-recordActiveMessageInProcessingTimesMap();
+  recordActiveMessageInProcessingTimesMap();
+  synchronized (this) {
+this.activeMessageMetadata =
+ActiveMessageMetadata.create(
+newDFState.getStepName().userName(), clock.getMillis());
   }
-  this.activeMessageMetadata =
-  
ActiveMessageMetadata.create(newDFState.getStepName().userName(), 
clock.getMillis());
 }
 elementExecutionTracker.enter(newDFState.getStepName());
   }
 
   return () -> {
 if (isDataflowProcessElementState) {
-  if (this.activeMessageMetadata != null) {
-recordActiveMessageInProcessingTimesMap();
-  }
+  recordActiveMessageInProcessingTimesMap();
   elementExecutionTracker.exit();
 }
 baseCloseable.close();
@@ -337,12 +342,21 @@ public abstract class DataflowExecutionContext {
   return this.workItemId;
 }
 
-public Optional getActiveMessageMetadata() {
+public synchronized Optional 
getActiveMessageMetadata() {
   return Optional.ofNullable(activeMessageMetadata);
 }
 
-public Map getProcessingTimesByStepCopy() {
-  Map processingTimesCopy = 
processingTimesByStep;
+public synchronized Map 
getProcessingTimesByStepCopy() {
+  Map processingTimesCopy =
+  processingTimesByStep.entrySet().stream()
+  .collect(
+  Collectors.toMap(
+  e -> e.getKey(),
+  e -> {
+IntSummaryStatistics clone = new 
IntSummaryStatistics();
+clone.combine(e.getValue());
+return clone;
+  }));
   return processingTimesCopy;
 }
 
@@ -351,17 +365,19 @@ public abstract class DataflowExecutionContext {
  * processing times map. Sets the activeMessageMetadata to null after the 
entry has been
 

(beam) branch master updated: Remove unused StreamingDataflowWorker parameter (#30256)

2024-02-12 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 371576a3b17 Remove unused StreamingDataflowWorker parameter (#30256)
371576a3b17 is described below

commit 371576a3b17b940380192378848dd00c55d0cc19
Author: martin trieu 
AuthorDate: Mon Feb 12 06:33:20 2024 -0800

Remove unused StreamingDataflowWorker parameter (#30256)
---
 .../dataflow/worker/StreamingDataflowWorker.java   |  2 +-
 .../worker/StreamingModeExecutionContext.java  | 23 ++---
 .../dataflow/worker/counters/NameContext.java  | 29 --
 .../dataflow/worker/streaming/StageInfo.java   |  7 +++---
 .../worker/DataflowExecutionContextTest.java   |  9 +++
 .../worker/StreamingModeExecutionContextTest.java  | 14 +++
 .../dataflow/worker/WorkerCustomSourcesTest.java   |  4 +--
 7 files changed, 46 insertions(+), 42 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index bca14923cfc..2f9e18cde67 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -991,7 +991,7 @@ public class StreamingDataflowWorker {
 
 StageInfo stageInfo =
 stageInfoMap.computeIfAbsent(
-mapTask.getStageName(), s -> StageInfo.create(s, 
mapTask.getSystemName(), this));
+mapTask.getStageName(), s -> StageInfo.create(s, 
mapTask.getSystemName()));
 
 ExecutionState executionState = null;
 String counterName = "dataflow_source_bytes_processed-" + 
mapTask.getSystemName();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index 5b18e29293e..2e9e7e608a5 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -42,6 +42,7 @@ import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import 
org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
+import 
org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
 import 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.StepContext;
 import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
@@ -440,21 +441,18 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContextExamples: "MapElements/Map", 
"BigShuffle.GroupByFirstNBytes/GroupByKey/Reify"
*/
   public abstract @Nullable String userName();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+public abstract Builder setStageName(String value);
+
+public abstract Builder setOriginalName(@Nullable String value);
+
+public abstract Builder setSystemName(@Nullable String value);
+
+public abstract Builder setUserName(@Nullable String value);
+
+public abstract NameContext build();
+  }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
index 64c97dcac51..cb6cbec7d4b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
@@ -39,15 +39,14 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterab
 /** Contains a few of the stage specific fields. E.g. metrics container 
registry, counters etc. */
 @AutoValue
 public abstract class StageInfo {
-  public static StageInfo create(
-  String stageName, String systemName, StreamingDataflowWorker worker) {
-NameContext nameContext = NameContext.create(stageName, null, systemName, 
null);
+  public static StageInfo create(String stageName, S

svn commit: r67270 - /dev/beam/vendor/grpc-1_60_1/0.2/

2024-02-09 Thread scwhittle
Author: scwhittle
Date: Fri Feb  9 12:50:53 2024
New Revision: 67270

Log:
Add vendored grpc-1_60_1.02

Added:
dev/beam/vendor/grpc-1_60_1/0.2/

dev/beam/vendor/grpc-1_60_1/0.2/apache-beam-2d08b32e674a1046ba7be0ae5f1e4b7b05b73488-source-release.zip
   (with props)

dev/beam/vendor/grpc-1_60_1/0.2/apache-beam-2d08b32e674a1046ba7be0ae5f1e4b7b05b73488-source-release.zip.asc

dev/beam/vendor/grpc-1_60_1/0.2/apache-beam-2d08b32e674a1046ba7be0ae5f1e4b7b05b73488-source-release.zip.sha512

Added: 
dev/beam/vendor/grpc-1_60_1/0.2/apache-beam-2d08b32e674a1046ba7be0ae5f1e4b7b05b73488-source-release.zip
==
Binary file - no diff available.

Propchange: 
dev/beam/vendor/grpc-1_60_1/0.2/apache-beam-2d08b32e674a1046ba7be0ae5f1e4b7b05b73488-source-release.zip
--
svn:mime-type = application/octet-stream

Added: 
dev/beam/vendor/grpc-1_60_1/0.2/apache-beam-2d08b32e674a1046ba7be0ae5f1e4b7b05b73488-source-release.zip.asc
==
--- 
dev/beam/vendor/grpc-1_60_1/0.2/apache-beam-2d08b32e674a1046ba7be0ae5f1e4b7b05b73488-source-release.zip.asc
 (added)
+++ 
dev/beam/vendor/grpc-1_60_1/0.2/apache-beam-2d08b32e674a1046ba7be0ae5f1e4b7b05b73488-source-release.zip.asc
 Fri Feb  9 12:50:53 2024
@@ -0,0 +1,16 @@
+-BEGIN PGP SIGNATURE-
+
+iQIzBAABCgAdFiEEknCCeZHC9kOgdMKQ/P0VKBG/FXgFAmXGHlQACgkQ/P0VKBG/
+FXiFphAAjYSx8wl8K48nrkNAIaTegTxkF65mT/WKnIIfSG2plQuaONSjfjZs0/am
+EKIGqSuAQ/frbAQZzKpxk9yltmZ63VDC00lF8V28vvTS8tipMHfxdKgCkoSy6D69
+5bUtwLCUmc/+LRe/ZcRrxklsUA+2vJ43Ukr2dlmGDpxQnL1peSu892xZ+RgVYeGn
+zg2zJhYWdYOP9v4iKtbWMdRyhdmxCcxeoJkFzEFKo3gAjI8lCYzZPS/N7qMu5pZh
+LbI1lDii4Br2lNfNvrPmz/FjZPQ4KWQADDtjbMOjOB+ZGuVBu50ocxb3aWAT1dQo
+Rh98eLq+noe5DqEKaIXOSNez63VDQDGqsVVq0Z9fcEAlH4VnXc2Qp15peXVaKzQw
+uogC/oGd0qwMcNwnIFBVT5m2GNhJBFYdBDC64x/bWYhIWa1zq/fP0IpZTPxdpkaV
+d8X4RUUu5hpWgpeS1+f00ge37B3IAp26T+3uHo+gMEpVzdgS4rzNWnfcNn/UR++G
+FrabU9lnSvV8olVxOoOg8foU/NoI5e8ouiZP59GFTdRrQqsSerRoBtwCnzXtvOzp
+dNILUk341d1+CTWWH3izaZJDA26A0LVoj0CM4TFFtcez67/sW90Y/Iy+2x25kpvZ
+tZtML1JOzv5U6MbmteHYTYODI8dvXpHoNmE7OIRgzlBFoWPRr78=
+=JqHN
+-END PGP SIGNATURE-

Added: 
dev/beam/vendor/grpc-1_60_1/0.2/apache-beam-2d08b32e674a1046ba7be0ae5f1e4b7b05b73488-source-release.zip.sha512
==
--- 
dev/beam/vendor/grpc-1_60_1/0.2/apache-beam-2d08b32e674a1046ba7be0ae5f1e4b7b05b73488-source-release.zip.sha512
 (added)
+++ 
dev/beam/vendor/grpc-1_60_1/0.2/apache-beam-2d08b32e674a1046ba7be0ae5f1e4b7b05b73488-source-release.zip.sha512
 Fri Feb  9 12:50:53 2024
@@ -0,0 +1 @@
+db84d3c6179b667016675e0b26a6129ae2f83c72948ae95a651a90023e005bea03f729ce81f8f84f866024fd5013bbef02b14e49dcc6d9f7ce9f31cb090e51f9
  apache-beam-2d08b32e674a1046ba7be0ae5f1e4b7b05b73488-source-release.zip




(beam) branch master updated (e9202abb7ad -> 2d08b32e674)

2024-02-09 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from e9202abb7ad Merge pull request #30161 [YAML] Add documentation to Beam 
site.
 add 2d08b32e674 Add options giving more isolation between grpc streaming 
rpcs. (#30233)

No new revisions were added by this update.

Summary of changes:
 .../google-cloud-dataflow-java/worker/build.gradle |   3 +-
 .../worker/MetricTrackingWindmillServerStub.java   |  81 -
 .../dataflow/worker/StreamingDataflowWorker.java   |   7 +-
 .../options/StreamingDataflowWorkerOptions.java|  19 +
 .../windmill/client/grpc/GrpcWindmillServer.java   |   3 +-
 .../client/grpc/stubs/IsolationChannel.java| 249 +
 .../client/grpc/stubs/WindmillChannelFactory.java  |   8 +-
 .../client/grpc/stubs/WindmillStubFactory.java |  17 +-
 .../client/grpc/stubs/IsolationChannelTest.java| 403 +
 9 files changed, 756 insertions(+), 34 deletions(-)
 create mode 100644 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/IsolationChannel.java
 create mode 100644 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/IsolationChannelTest.java



(beam) branch master updated: Track windmill current active work budget. (#30048)

2024-02-08 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 24efe7619d9 Track windmill current active work budget. (#30048)
24efe7619d9 is described below

commit 24efe7619d9a0e5fae70a6d69beabb2d23f362b1
Author: martin trieu 
AuthorDate: Thu Feb 8 02:00:15 2024 -0800

Track windmill current active work budget. (#30048)
---
 .../dataflow/worker/StreamingDataflowWorker.java   |  6 +-
 .../dataflow/worker/streaming/ActiveWorkState.java | 79 +++-
 .../runners/dataflow/worker/streaming/Work.java|  2 -
 .../worker/streaming/ActiveWorkStateTest.java  | 83 +++---
 4 files changed, 142 insertions(+), 28 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 463ab953fae..e8ca3a2834f 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -1970,8 +1970,10 @@ public class StreamingDataflowWorker {
   failedWork
   .computeIfAbsent(heartbeatResponse.getShardingKey(), key -> new 
ArrayList<>())
   .add(
-  new FailedTokens(
-  heartbeatResponse.getWorkToken(), 
heartbeatResponse.getCacheToken()));
+  FailedTokens.newBuilder()
+  .setWorkToken(heartbeatResponse.getWorkToken())
+  .setCacheToken(heartbeatResponse.getCacheToken())
+  .build());
 }
   }
   ComputationState state = 
computationMap.get(computationHeartbeatResponse.getComputationId());
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
index ff46356d956..b4b46932393 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.worker.streaming;
 
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList.toImmutableList;
 
+import com.google.auto.value.AutoValue;
 import java.io.PrintWriter;
 import java.util.ArrayDeque;
 import java.util.Deque;
@@ -28,6 +29,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Queue;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
@@ -38,6 +40,7 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.HeartbeatRequest;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem;
 import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
 import org.apache.beam.sdk.annotations.Internal;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
@@ -70,11 +73,19 @@ public final class ActiveWorkState {
   @GuardedBy("this")
   private final WindmillStateCache.ForComputation computationStateCache;
 
+  /**
+   * Current budget that is being processed or queued on the user worker. 
Incremented when work is
+   * activated in {@link #activateWorkForKey(ShardedKey, Work)}, and 
decremented when work is
+   * completed in {@link #completeWorkAndGetNextWorkForKey(ShardedKey, long)}.
+   */
+  private final AtomicReference activeGetWorkBudget;
+
   private ActiveWorkState(
   Map> activeWork,
   WindmillStateCache.ForComputation computationStateCache) {
 this.activeWork = activeWork;
 this.computationStateCache = computationStateCache;
+this.activeGetWorkBudget = new AtomicReference<>(GetWorkBudget.noBudget());
   }
 
   static ActiveWorkState create(WindmillStateCache.ForComputation 
computationStateCache) {
@@ -88,6 +99,12 @@ public final class ActiveWorkState {
 return new ActiveWorkState(activeWork, computationStateCache);
   }
 
+  private

svn commit: r67230 - /dev/beam/KEYS

2024-02-07 Thread scwhittle
Author: scwhittle
Date: Wed Feb  7 09:30:52 2024
New Revision: 67230

Log:
Add key for scwhit...@apache.org

Modified:
dev/beam/KEYS

Modified: dev/beam/KEYS
==
--- dev/beam/KEYS (original)
+++ dev/beam/KEYS Wed Feb  7 09:30:52 2024
@@ -3769,3 +3769,62 @@ RrHIslA3VYLBsQHf8TOvTiaQvqdqUa6g2wFlL6iV
 dU847f1+6L6J9W2ZEHerGlJCSkeQFrhhmKRqGmHUcCxNkkKybjFLdixo
 =W5+j
 -END PGP PUBLIC KEY BLOCK-
+pub   rsa4096 2024-02-07 [SC]
+  9270827991C2F643A074C290FCFD152811BF1578
+uid   [ultimate] Samuel Carl Whittle (CODE SIGNING KEY) 

+sig 3FCFD152811BF1578 2024-02-07  Samuel Carl Whittle (CODE SIGNING 
KEY) 
+sub   rsa4096 2024-02-07 [E]
+sig  FCFD152811BF1578 2024-02-07  Samuel Carl Whittle (CODE SIGNING 
KEY) 
+
+-BEGIN PGP PUBLIC KEY BLOCK-
+
+mQINBGXDRpEBEADMrqBmoQpOldarLYbLANpnI/pekme6lKKkz0kjfXCsxnehQnlh
+zPCzmJDGf+jqXDFZHcoRKNjGPZ3ImzFo4xA3/FPudg3Fr20RFsb8XSgp4pSEamcG
+ghUI8ueQUmzXohs9Mb19XJPhNJSaRsADqDqETBbACovo+Xgz7Y931rc15y34bmoz
++vYULi0mm3gt3VKJ09p7XNEivbRGst3Rgp0GyATJVAb5nSc8VVMgKzujVSZWZf/T
++dPrXDerRniqUPe3GbCg/Oav2aqAL5+V/7bmskrNrgokm8Qiwd8yqWMX4TqRj/vo
+79Hmcsfe+w1rYBnoghOjNZUryv4Qaekb1RA9pfYLik0RfnPDhPwDvqo27hJSRFTN
+QfDi3GgUoxTyzUEK2s2guxdukZlKXCcAq21Wx2nsM03ziiEdt1BgGA48d/Dm8plh
+ddPUfekcSG5aTjqeLV0GyUcVQikmyrkTz2DHYEaIg+RnVFOrzsU82xTbgYb41abm
+ZyFS28aWMs+OJtrbbZw9pEGtULgH8hmHoRfDxPQqb+40Pz+rDRDNTp9GN5m2+/VI
+WubIBMjALiDe2cA8fhH55O2jru7ek3Orv57GkbFOdOsP3iLTWPCC62SRueA2UNPs
+Q5oOTlbXQN/O4oB0eE0LRC+8Cn7XXAoRtVB2q/V3GN+k15UjLUYaSccyCwARAQAB
+tD1TYW11ZWwgQ2FybCBXaGl0dGxlIChDT0RFIFNJR05JTkcgS0VZKSA8c2N3aGl0
+dGxlQGFwYWNoZS5vcmc+iQJOBBMBCgA4FiEEknCCeZHC9kOgdMKQ/P0VKBG/FXgF
+AmXDRpECGwMFCwkIBwIGFQoJCAsCBBYCAwECHgECF4AACgkQ/P0VKBG/FXh9TBAA
+mhmxNMUdK2csonydMVxFCYgvZvzB6/AMFDAClRFmwiCvA9YjqHLy84pYBnD87X3k
+PpaiOFr1QDTp6c2XD73+uhe07JNmFp6pZG082x41kCd935s5KvB4RvBwXlZBCuY+
+3X4+3cAv8cRZOOxr7mhA2MROcjk/poL7VcwZ0lIP9beoD8iJlK0TYXtlP1wtndQT
+WkfIS+GwNuCeR8We3nCcMtidr7rszXc9LqlsEeI3ynFbKqoJSJipIx1+qAISvfbe
+iWRDPK6sDrfapC6c+3qnGisNRpMrn7YJbtQayzzXAWhn3AKhTS0fktaJsf1NcbSw
+/iNqOaVsTFU7tJvbKwRto+kb6aO+E3mWG4Jnwz8Qq2AWsUmHTn47bluN/xJutKcj
+Rx2FB0oh7pMj4AbdBRnH85jtxP6pFvrffRmW+FskiueewScL1UQn4csaJJCFZpeP
+FL8RJUW9x5aKFMc7stLPDrJAWlQeCrQHB2iLUEzvxLpR8vqEa8yvesYGYUbS8Uxk
+M7GF8t7rKtw51Fg1aoqy6V59147WAxN7dqorjJl3PqSNfdJvvdStYjLSY2IWrZNs
+UdsgByhmKYzo9cMw9GvMxltbW4zA1Z4LDZr7uDe4tvTHOUfECMNxCP+mlwR6G/Ot
+IIqLTXP/PaYY+R11w2JO2O3y642ViPNGPa3pwDHskX+5Ag0EZcNGkQEQAKsrjcr/
+W+sCD/hI1nkqY5VoISBDrE6XF22PT+DCnAWYrfpdh1uS8+Jv6UKqDSqHjG8Ttnud
+YdPiwLcH+dRgDBkNWrWG2M+KZ9vnE+pgygXEMXvm3hvpDdPv9utxc26YRPB6brwQ
+I0lZZBO6km8CN/8jKdhE3jFiJxcgHLhfO3/1woKqQWlGK2Q2IWi6RxAIaxFOpvog
+NQ+64i5+xYHo/zziZ234WInNaxA4wA5chX/AX0ZAUX440ZEjACCwy5wJyN4T0tnr
+who4JdYbQP74oDxiIwCNZ4If6CnV6uAp9bbG4Y5elD7dycjlXwuaucMJsNyB4f2a
+zNeyy7gi1PRlEEL8MJpO6VoEV0qQKu8gUJOwSYIi3C6c7Ci80ugMDc5vsW0PnVRf
+T6PMaQcRtbVpJxeVlguiv8AMbbwVFuZ9SCKwYBfY5EosXVpgm2Mp7mrh6wX78ldb
+WlY2+hYz+6fSa99GzkE4ABy6sSnbEY24SxnfMqzpHY87Wkv7IPB8eJed7z5LfjJ7
+Oghuw9uew3cdtF752lJg7zA31HRNdu/7KKiUnGKYLOEZCsm+8jqIeBWIwOIJjm9n
+DTMArLsZMMJ5VjSq3WYPyap/NyeDsHB+sXhhunbKzeq0pLvLjIjNporIdIfNCaXe
+sePUjqJyJHDvsjx8qRJ04ra5WSNnAaafVlJRABEBAAGJAjYEGAEKACAWIQSScIJ5
+kcL2Q6B0wpD8/RUoEb8VeAUCZcNGkQIbDAAKCRD8/RUoEb8VeKVqD/0aDsRV/mB4
+UXr6h7ES5RUVK4uIduMSVP/yVCdHCyO0M7wj2UtIl1vdzSeh5Q1oIYO7lJ59yMo0
+x0MU9xpuZ1L3dZ5CMkaq627q1B1czEyvrdjWc9260oFg8XT4/Pwn0hqBV8JQZ+tU
+E6m4qdk3bB+2ji0bRxhZTNFnQG3KbAEfHrzq3sotRvQkAecP3vlnxSljoEalABVd
+E5QVXfI6kOhFW7YxXK/aqbFSGTOSMJXDSo3OZHML45ikxY28Asxcj4ZFTX/jyDP8
+/b9MDYeRa05tThWbUXp2tGenPVl6U2sw8Vs5HLhRMdeMG1lkKarjYB62hvcNQJRu
+6bKBipY31mmfSKeF/O+C82DsG7JoR4owngNTEeMe0OCWPhFuz+dSzuGEy4xX+ntL
+Y+16cRkgy+zd84nKpA4JJGBw9Daf36ERIJWTB89mJF0vNc3DZ0Hu4IbNpoXcOSwy
+BXou/aTd4z2gIwgR5WYBwHMLyLDBZEmHRc+ERwfFiaXk8ZDe9MRa3KR+AZwJSVJJ
+AZ15BUZ803QfQ5Y3M0dBUXyHcbJF3g7ae+f1IKWPWQYEsHLGGUqR0s3eNPORKfOU
+TB507dlnLs4lIhIGbIb75r45yDHftifuIJdAL1TIZK2LKH8WuZVV1zYe++pCVGLw
+wq5TRnJP/RWY0SQtT68rQhkr++ybjW1Jww==
+=V4/Y
+-END PGP PUBLIC KEY BLOCK-




(beam) branch master updated: Fix to take StreamingDataflowWorkerOptions from external options (#30232)

2024-02-06 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 7fa4c99535f Fix to take StreamingDataflowWorkerOptions from external 
options (#30232)
7fa4c99535f is described below

commit 7fa4c99535f4c3ce8b72eae40bd29a288ed98cee
Author: Arun Pandian 
AuthorDate: Tue Feb 6 11:36:00 2024 -0800

Fix to take StreamingDataflowWorkerOptions from external options (#30232)

-

Co-authored-by: Arun Pandian 
---
 .../worker/DataflowBatchWorkerHarness.java |  2 +-
 .../worker/DataflowWorkerHarnessHelper.java|  8 +++---
 .../dataflow/worker/StreamingDataflowWorker.java   | 14 +-
 .../worker/WorkerPipelineOptionsFactory.java   | 15 +--
 .../worker/DataflowWorkerHarnessHelperTest.java| 30 +-
 .../worker/WorkerPipelineOptionsFactoryTest.java   |  4 +--
 6 files changed, 48 insertions(+), 25 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java
index cc79ac6dbc0..51127c2dc2f 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java
@@ -61,7 +61,7 @@ public class DataflowBatchWorkerHarness {
 
DataflowWorkerHarnessHelper.initializeLogging(DataflowBatchWorkerHarness.class);
 DataflowWorkerHarnessOptions pipelineOptions =
 DataflowWorkerHarnessHelper.initializeGlobalStateAndPipelineOptions(
-DataflowBatchWorkerHarness.class);
+DataflowBatchWorkerHarness.class, 
DataflowWorkerHarnessOptions.class);
 DataflowBatchWorkerHarness batchHarness = new 
DataflowBatchWorkerHarness(pipelineOptions);
 DataflowWorkerHarnessHelper.configureLogging(pipelineOptions);
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
index c6d8d727ef4..94c894608a4 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
@@ -49,11 +49,11 @@ public final class DataflowWorkerHarnessHelper {
   private static final String ROOT_LOGGER_NAME = "";
   private static final String PIPELINE_PATH = "PIPELINE_PATH";
 
-  public static DataflowWorkerHarnessOptions 
initializeGlobalStateAndPipelineOptions(
-  Class workerHarnessClass) throws Exception {
+  public static  T 
initializeGlobalStateAndPipelineOptions(
+  Class workerHarnessClass, Class harnessOptionsClass) throws 
Exception {
 /* Extract pipeline options. */
-DataflowWorkerHarnessOptions pipelineOptions =
-WorkerPipelineOptionsFactory.createFromSystemProperties();
+T pipelineOptions =
+
WorkerPipelineOptionsFactory.createFromSystemProperties(harnessOptionsClass);
 pipelineOptions.setAppName(workerHarnessClass.getSimpleName());
 
 /* Configure logging with job-specific properties. */
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 14efdcc5eb0..463ab953fae 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -64,7 +64,6 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.beam.runners.core.metrics.MetricsLogger;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.internal.CustomSources;
-import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
 import org.apache.beam.runners.dataflow.util.CloudObject;
 import org.apache.beam.runners.dataflow.util.CloudObjects;
 import 
org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics.StreamingSystemCounterNames;
@@ -472,9 +471,9 @@ public class StreamingDataflowWorker {
 JvmInitializers.runOnStartup();
 
 
DataflowWorkerHarnessHelper.initializeLogging(Streaming

(beam) branch master updated: [Dataflow Streaming] Invalidate caches and remove work on failure before commit (#30229)

2024-02-06 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 5494f114382 [Dataflow Streaming] Invalidate caches and remove work on 
failure before commit (#30229)
5494f114382 is described below

commit 5494f1143827e0e6fec9e331b93c00c83d10c66e
Author: Arun Pandian 
AuthorDate: Tue Feb 6 03:34:48 2024 -0800

[Dataflow Streaming] Invalidate caches and remove work on failure before 
commit (#30229)

* Invalidate caches and remove work on failure before commit
* Prevent completeWorkAndScheduleNextWorkForKey from throwing

-

Co-authored-by: Arun Pandian 
---
 .../dataflow/worker/StreamingDataflowWorker.java| 15 +++
 .../dataflow/worker/streaming/ActiveWorkState.java  | 17 +++--
 2 files changed, 18 insertions(+), 14 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 3ba27bd852f..14efdcc5eb0 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -1397,12 +1397,21 @@ public class StreamingDataflowWorker {
   // Adds the commit to the commitStream if it fits, returning true iff it is 
consumed.
   private boolean addCommitToStream(Commit commit, CommitWorkStream 
commitStream) {
 Preconditions.checkNotNull(commit);
+final ComputationState state = commit.computationState();
+final Windmill.WorkItemCommitRequest request = commit.request();
 // Drop commits for failed work. Such commits will be dropped by Windmill 
anyway.
 if (commit.work().isFailed()) {
+  readerCache.invalidateReader(
+  WindmillComputationKey.create(
+  state.getComputationId(), request.getKey(), 
request.getShardingKey()));
+  stateCache
+  .forComputation(state.getComputationId())
+  .invalidate(request.getKey(), request.getShardingKey());
+  state.completeWorkAndScheduleNextWorkForKey(
+  ShardedKey.create(request.getKey(), request.getShardingKey()), 
request.getWorkToken());
   return true;
 }
-final ComputationState state = commit.computationState();
-final Windmill.WorkItemCommitRequest request = commit.request();
+
 final int size = commit.getSize();
 commit.work().setState(Work.State.COMMITTING);
 activeCommitBytes.addAndGet(size);
@@ -1419,8 +1428,6 @@ public class StreamingDataflowWorker {
 .invalidate(request.getKey(), request.getShardingKey());
   }
   activeCommitBytes.addAndGet(-size);
-  // This may throw an exception if the commit was not active, which 
is possible if it
-  // was deemed stuck.
   state.completeWorkAndScheduleNextWorkForKey(
   ShardedKey.create(request.getKey(), request.getShardingKey()),
   request.getWorkToken());
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
index 54942dfeee1..ff46356d956 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
@@ -188,16 +188,13 @@ public final class ActiveWorkState {
 
   private synchronized void removeCompletedWorkFromQueue(
   Queue workQueue, ShardedKey shardedKey, long workToken) {
-// avoid Preconditions.checkState here to prevent eagerly evaluating the
-// format string parameters for the error message.
-Work completedWork =
-Optional.ofNullable(workQueue.peek())
-.orElseThrow(
-() ->
-new IllegalStateException(
-String.format(
-"Active key %s without work, expected token %d",
-shardedKey, workToken)));
+Work completedWork = workQueue.peek();
+if (completedWork == null) {
+  // Work may have been completed due to clearing of stuck commits.
+  LOG.warn(
+  String.format("Active key %s without work, expected token %d", 
shardedKey, workToken));
+  return;
+}
 
 if (completedWork.getWorkItem().getWorkToken() != workToken) {
 

(beam) branch revert-30215-fail-commit deleted (was 55a5b6b84f4)

2024-02-06 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a change to branch revert-30215-fail-commit
in repository https://gitbox.apache.org/repos/asf/beam.git


 was 55a5b6b84f4 Revert "When failing work items during commit, make sure 
to call completeWork…"

The revisions that were on this branch are still contained in
other references; therefore, this change does not discard any commits
from the repository.



(beam) branch master updated: Revert "When failing work items during commit, make sure to call completeWork…" (#30228)

2024-02-06 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 23dcb7ec1d5 Revert "When failing work items during commit, make sure 
to call completeWork…" (#30228)
23dcb7ec1d5 is described below

commit 23dcb7ec1d539759f5c587a6dfee357ad250db72
Author: Sam Whittle 
AuthorDate: Tue Feb 6 11:23:57 2024 +0100

Revert "When failing work items during commit, make sure to call 
completeWork…" (#30228)

This reverts commit b0f2eebb0244302ac2315dc260536512d229401f.
---
 .../runners/dataflow/worker/StreamingDataflowWorker.java | 12 ++--
 1 file changed, 2 insertions(+), 10 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index b48032677ff..3ba27bd852f 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -1397,20 +1397,12 @@ public class StreamingDataflowWorker {
   // Adds the commit to the commitStream if it fits, returning true iff it is 
consumed.
   private boolean addCommitToStream(Commit commit, CommitWorkStream 
commitStream) {
 Preconditions.checkNotNull(commit);
-final ComputationState state = commit.computationState();
-final Windmill.WorkItemCommitRequest request = commit.request();
 // Drop commits for failed work. Such commits will be dropped by Windmill 
anyway.
 if (commit.work().isFailed()) {
-  readerCache.invalidateReader(
-  WindmillComputationKey.create(
-  state.getComputationId(), request.getKey(), 
request.getShardingKey()));
-  stateCache
-  .forComputation(state.getComputationId())
-  .invalidate(request.getKey(), request.getShardingKey());
-  state.completeWorkAndScheduleNextWorkForKey(
-  ShardedKey.create(request.getKey(), request.getShardingKey()), 
request.getWorkToken());
   return true;
 }
+final ComputationState state = commit.computationState();
+final Windmill.WorkItemCommitRequest request = commit.request();
 final int size = commit.getSize();
 commit.work().setState(Work.State.COMMITTING);
 activeCommitBytes.addAndGet(size);



(beam) branch revert-30215-fail-commit created (now 55a5b6b84f4)

2024-02-06 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a change to branch revert-30215-fail-commit
in repository https://gitbox.apache.org/repos/asf/beam.git


  at 55a5b6b84f4 Revert "When failing work items during commit, make sure 
to call completeWork…"

This branch includes the following new commits:

 new 55a5b6b84f4 Revert "When failing work items during commit, make sure 
to call completeWork…"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




(beam) 01/01: Revert "When failing work items during commit, make sure to call completeWork…"

2024-02-06 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch revert-30215-fail-commit
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 55a5b6b84f4fdcd5363c60296c335a8ed27361ce
Author: Sam Whittle 
AuthorDate: Tue Feb 6 11:23:28 2024 +0100

Revert "When failing work items during commit, make sure to call 
completeWork…"

This reverts commit b0f2eebb0244302ac2315dc260536512d229401f.
---
 .../runners/dataflow/worker/StreamingDataflowWorker.java | 12 ++--
 1 file changed, 2 insertions(+), 10 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index b48032677ff..3ba27bd852f 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -1397,20 +1397,12 @@ public class StreamingDataflowWorker {
   // Adds the commit to the commitStream if it fits, returning true iff it is 
consumed.
   private boolean addCommitToStream(Commit commit, CommitWorkStream 
commitStream) {
 Preconditions.checkNotNull(commit);
-final ComputationState state = commit.computationState();
-final Windmill.WorkItemCommitRequest request = commit.request();
 // Drop commits for failed work. Such commits will be dropped by Windmill 
anyway.
 if (commit.work().isFailed()) {
-  readerCache.invalidateReader(
-  WindmillComputationKey.create(
-  state.getComputationId(), request.getKey(), 
request.getShardingKey()));
-  stateCache
-  .forComputation(state.getComputationId())
-  .invalidate(request.getKey(), request.getShardingKey());
-  state.completeWorkAndScheduleNextWorkForKey(
-  ShardedKey.create(request.getKey(), request.getShardingKey()), 
request.getWorkToken());
   return true;
 }
+final ComputationState state = commit.computationState();
+final Windmill.WorkItemCommitRequest request = commit.request();
 final int size = commit.getSize();
 commit.work().setState(Work.State.COMMITTING);
 activeCommitBytes.addAndGet(size);



(beam) branch master updated (cd5f2711c83 -> b0f2eebb024)

2024-02-06 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from cd5f2711c83 Merge pull request #30141: Adding a default watermark emit 
interval for FlinkUnboundedSourceReader
 add b0f2eebb024 When failing work items during commit, make sure to call 
completeWork… (#30215)

No new revisions were added by this update.

Summary of changes:
 .../runners/dataflow/worker/StreamingDataflowWorker.java | 12 ++--
 1 file changed, 10 insertions(+), 2 deletions(-)



(beam) branch master updated (124880bc8ce -> 1f69271044d)

2024-02-02 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 124880bc8ce Fix typo affecting 
DataflowPipelineDebugOptions.setStreamingSideInputCacheExpirationMillis (#30195)
 add 1f69271044d [Dataflow Streaming] Add a option to use multiple commit 
threads (#30194)

No new revisions were added by this update.

Summary of changes:
 .../options/DataflowPipelineDebugOptions.java  |  6 +++
 .../dataflow/worker/StreamingDataflowWorker.java   | 49 --
 .../worker/StreamingDataflowWorkerTest.java| 46 +++-
 3 files changed, 77 insertions(+), 24 deletions(-)



(beam) branch master updated: Fix typo affecting DataflowPipelineDebugOptions.setStreamingSideInputCacheExpirationMillis (#30195)

2024-02-02 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 124880bc8ce Fix typo affecting 
DataflowPipelineDebugOptions.setStreamingSideInputCacheExpirationMillis (#30195)
124880bc8ce is described below

commit 124880bc8ce22a2ccbbc0efab435f64099ce65df
Author: Arun Pandian 
AuthorDate: Fri Feb 2 00:52:55 2024 -0800

Fix typo affecting 
DataflowPipelineDebugOptions.setStreamingSideInputCacheExpirationMillis (#30195)

Co-authored-by: Arun Pandian 
---
 .../beam/runners/dataflow/options/DataflowPipelineDebugOptions.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
index 290418bd1cb..794dd76d243 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
@@ -258,7 +258,7 @@ public interface DataflowPipelineDebugOptions
   @Default.Integer(60 * 1000) // 1 minute
   Integer getStreamingSideInputCacheExpirationMillis();
 
-  void setstreamingSideInputCacheExpirationMillis(Integer value);
+  void setStreamingSideInputCacheExpirationMillis(Integer value);
 
   /**
* The amount of time before UnboundedReaders are considered idle and closed 
during streaming



(beam) branch master updated (5e7edc45598 -> 0a813b99657)

2024-01-23 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 5e7edc45598 Heartbeats (#29963)
 add 0a813b99657 Plumbing remaining autoscaling metrics (#30070)

No new revisions were added by this update.

Summary of changes:
 .../main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy| 2 +-
 .../beam/runners/dataflow/worker/StreamingDataflowWorker.java | 4 +++-
 .../beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java  | 8 +++-
 3 files changed, 11 insertions(+), 3 deletions(-)



(beam) branch master updated: Heartbeats (#29963)

2024-01-23 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 5e7edc45598 Heartbeats (#29963)
5e7edc45598 is described below

commit 5e7edc45598b6438761386856e96a66487704b69
Author: Andrew Crites 
AuthorDate: Tue Jan 23 01:28:20 2024 -0800

Heartbeats (#29963)

* Adds sending new HeartbeatRequest protos to StreamingDataflowWorker. If 
any HeartbeatResponses are sent from Windmill containing failed work items, 
aborts processing those work items as soon as possible.

* Adds sending new HeartbeatRequest protos when using streaming RPC's 
(streaming engine). Also adds a test.

* Adds new test for custom source reader exiting early for failed work. 
Adds special exception for handling failed work.

* removes some extra cache invalidations and unneeded log statements.

* Added streaming_engine prefix to experiment enabling heartbeats and 
changed exception in state reader to be WorkItemFailedException.

* Adds check that heartbeat response sets failed before failing work.

* Adds ability to plumb experiments to test server for 
GrpcWindmillServerTest so we can test the new style heartbeats.

* Changes StreamingDataflowWorkerTest to look for latency attribution in 
new-style heartbeat requests since that's what FakeWindmillServer returns now.
---
 .../worker/MetricTrackingWindmillServerStub.java   |  32 ++--
 .../beam/runners/dataflow/worker/PubsubReader.java |   8 +
 .../dataflow/worker/StreamingDataflowWorker.java   |  51 +-
 .../worker/StreamingModeExecutionContext.java  |  14 +-
 .../dataflow/worker/UngroupedWindmillReader.java   |   8 +
 .../worker/WorkItemCancelledException.java |  39 +
 .../dataflow/worker/WorkerCustomSources.java   |   3 +-
 .../dataflow/worker/streaming/ActiveWorkState.java |  66 +++-
 .../worker/streaming/ComputationState.java |  14 +-
 .../runners/dataflow/worker/streaming/Work.java|  11 ++
 .../worker/windmill/WindmillServerStub.java|   6 +
 .../worker/windmill/client/WindmillStream.java |   5 +-
 .../windmill/client/grpc/GrpcGetDataStream.java| 107 ++---
 .../windmill/client/grpc/GrpcWindmillServer.java   |  35 +++-
 .../client/grpc/GrpcWindmillStreamFactory.java |  16 +-
 .../worker/windmill/state/WindmillStateCache.java  |   5 +
 .../worker/windmill/state/WindmillStateReader.java |  18 ++-
 .../dataflow/worker/FakeWindmillServer.java|  47 +-
 .../worker/StreamingDataflowWorkerTest.java|  76 -
 .../worker/StreamingModeExecutionContextTest.java  |   6 +-
 .../dataflow/worker/WorkerCustomSourcesTest.java   |  82 +-
 .../worker/streaming/ActiveWorkStateTest.java  |  50 +++---
 .../client/grpc/GrpcWindmillServerTest.java| 177 ++---
 .../worker/windmill/src/main/proto/windmill.proto  |  51 +-
 24 files changed, 801 insertions(+), 126 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java
index 0e929249b3a..800504f4451 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java
@@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.concurrent.GuardedBy;
 import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
-import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.HeartbeatRequest;
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
@@ -239,25 +239,37 @@ public class MetricTrackingWindmillServerStub {
   }
 
   /** Tells windmill processing is ongoing for the given keys. */
-  public void refreshActiveWork(Map> active) 
{
-activeHeartbeats.set(active.size());
+  public void refreshActiveWork(Map> 
heartbeats) {
+activeHeartbeats.set(heartbeats.size());
 try {
   if (useStreamingRequests) {
 // With streaming requests, always send the request even when it is 
empty, to ensure that
 // we trigger health checks for the stream even when it is idle.
 GetDataStream 

(beam) branch master updated: Worker message plumbing (#29879)

2024-01-18 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new e867ed7cdce Worker message plumbing (#29879)
e867ed7cdce is described below

commit e867ed7cdce74beda4cce630173e3c53ee28cc50
Author: Edward Cheng 
AuthorDate: Thu Jan 18 02:47:11 2024 -0800

Worker message plumbing (#29879)

* use StreamingScalingReport for autoscaling signals

* add unit test stub

* spotless apply

* comment test stub

* add unit test

* simplify response processing

* spotless apply

* add more reported metrics

* remove byte metrics

* fix bug

* fix DataflowWorkUnitClient test

* fix DataflowWorkUnitClient test

* formatting

* add check for scheduledtimer

* option to options

* fix timer check

* revert long to int change

* refactor timers

* var type fix

* fix timers refactoring

* use arraylist instead of map

* add timer to list

* fix comment

-

Co-authored-by: scwhittle 
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |  2 +-
 .../dataflow/worker/DataflowWorkUnitClient.java| 42 ++
 .../dataflow/worker/StreamingDataflowWorker.java   | 89 ++
 .../runners/dataflow/worker/WorkUnitClient.java| 17 +
 .../dataflow/worker/util/BoundedQueueExecutor.java |  4 +-
 .../worker/DataflowWorkUnitClientTest.java | 24 ++
 6 files changed, 145 insertions(+), 33 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index c93af317b1a..a26cfaa457d 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -730,7 +730,7 @@ class BeamModulePlugin implements Plugin {
 google_api_services_bigquery: 
"com.google.apis:google-api-services-bigquery:v2-rev20230812-$google_clients_version",
 // Keep version consistent with the version in 
google_cloud_resourcemanager, managed by google_cloud_platform_libraries_bom
 google_api_services_cloudresourcemanager: 
"com.google.apis:google-api-services-cloudresourcemanager:v1-rev20230806-$google_clients_version",
-google_api_services_dataflow: 
"com.google.apis:google-api-services-dataflow:v1b3-rev20220920-$google_clients_version",
+google_api_services_dataflow: 
"com.google.apis:google-api-services-dataflow:v1b3-rev20231203-$google_clients_version",
 google_api_services_healthcare  : 
"com.google.apis:google-api-services-healthcare:v1-rev20240110-$google_clients_version",
 google_api_services_pubsub  : 
"com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version",
 // Keep version consistent with the version in google_cloud_nio, 
managed by google_cloud_platform_libraries_bom
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
index ffa377fd3f8..bb39e3bd9af 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
@@ -31,14 +31,19 @@ import 
com.google.api.services.dataflow.model.LeaseWorkItemRequest;
 import com.google.api.services.dataflow.model.LeaseWorkItemResponse;
 import com.google.api.services.dataflow.model.ReportWorkItemStatusRequest;
 import com.google.api.services.dataflow.model.ReportWorkItemStatusResponse;
+import com.google.api.services.dataflow.model.SendWorkerMessagesRequest;
+import com.google.api.services.dataflow.model.SendWorkerMessagesResponse;
+import com.google.api.services.dataflow.model.StreamingScalingReport;
 import com.google.api.services.dataflow.model.WorkItem;
 import com.google.api.services.dataflow.model.WorkItemServiceState;
 import com.google.api.services.dataflow.model.WorkItemStatus;
+import com.google.api.services.dataflow.model.WorkerMessage;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.beam.runners.dataflow.op

(beam) branch master updated: Additional test for windmill OrderedListState implementation. (#29970)

2024-01-17 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 5a4cbe2074f Additional test for windmill OrderedListState 
implementation. (#29970)
5a4cbe2074f is described below

commit 5a4cbe2074f8d0eccdec620d63bda6ecc709b7c4
Author: Sam Whittle 
AuthorDate: Wed Jan 17 10:13:08 2024 +0100

Additional test for windmill OrderedListState implementation. (#29970)

These were attempting to reproduce a bug that ended up being in the test 
pipeline,
but they seem worthwhile keeping to improve coverage.
---
 .../windmill/state/WindmillStateInternalsTest.java | 108 -
 1 file changed, 107 insertions(+), 1 deletion(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
index d2590ceb846..e8eeff3b1d1 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
@@ -47,6 +47,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
@@ -1871,19 +1872,64 @@ public class WindmillStateInternalsTest {
 assertThat(orderedList.read(), Matchers.contains(goodbyeValue, worldValue, 
helloValue));
   }
 
+  @Test
+  public void testOrderedListAddBeforeRangeRead() throws Exception {
+StateTag> addr =
+StateTags.orderedList("orderedList", StringUtf8Coder.of());
+OrderedListState orderedList = underTest.state(NAMESPACE, addr);
+
+SettableFuture>> future = 
SettableFuture.create();
+Range readSubrange = Range.closedOpen(70 * 1000L, 100 * 1000L);
+when(mockReader.orderedListFuture(
+readSubrange, key(NAMESPACE, "orderedList"), STATE_FAMILY, 
StringUtf8Coder.of()))
+.thenReturn(future);
+
+orderedList.readRangeLater(Instant.ofEpochMilli(70), 
Instant.ofEpochMilli(100));
+
+final TimestampedValue helloValue =
+TimestampedValue.of("hello", Instant.ofEpochMilli(100));
+final TimestampedValue worldValue =
+TimestampedValue.of("world", Instant.ofEpochMilli(75));
+final TimestampedValue goodbyeValue =
+TimestampedValue.of("goodbye", Instant.ofEpochMilli(50));
+
+orderedList.add(helloValue);
+waitAndSet(future, Collections.singletonList(worldValue), 200);
+orderedList.add(goodbyeValue);
+
+assertThat(
+orderedList.readRange(Instant.ofEpochMilli(70), 
Instant.ofEpochMilli(100)),
+Matchers.contains(worldValue));
+  }
+
   @Test
   public void testOrderedListClearBeforeRead() throws Exception {
 StateTag> addr =
 StateTags.orderedList("orderedList", StringUtf8Coder.of());
 OrderedListState orderedListState = underTest.state(NAMESPACE, 
addr);
 
-final TimestampedValue helloElement = TimestampedValue.of("hello", 
Instant.EPOCH);
+final TimestampedValue helloElement =
+TimestampedValue.of("hello", Instant.ofEpochSecond(1));
 orderedListState.clear();
 orderedListState.add(helloElement);
 assertThat(orderedListState.read(), 
Matchers.containsInAnyOrder(helloElement));
+// Shouldn't need to read from windmill for this.
+Mockito.verifyZeroInteractions(mockReader);
 
+assertThat(
+orderedListState.readRange(Instant.ofEpochSecond(1), 
Instant.ofEpochSecond(2)),
+Matchers.containsInAnyOrder(helloElement));
 // Shouldn't need to read from windmill for this.
 Mockito.verifyZeroInteractions(mockReader);
+
+// Shouldn't need to read from windmill for this.
+assertThat(
+orderedListState.readRange(Instant.ofEpochSecond(100), 
Instant.ofEpochSecond(200)),
+Matchers.emptyIterable());
+assertThat(
+orderedListState.readRange(Instant.EPOCH, Instant.ofEpochSecond(1)),
+Matchers.emptyIterable());
+Mockito.verifyZeroInteractions(mockReader);
   }
 
   @Test
@@ -2201,6 +2247,66 @@ public class WindmillStateInternalsTest {
 assertArrayEquals(expected, read);
   }
 
+  @Test
+  public void testOrderedListInterleavedLocalAddClearReadRange() {
+Future, RangeSet>> orderedListFuture = 
Futures.immediateFuture(null);
+Future, RangeSet>> deletionsFuture = 
Futures.immediateFuture(null);
+wh

(beam) branch master updated: Add a read timeout and cache BigQueryIOMetadata (#29662)

2024-01-16 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new dd5bbb2d77d Add a read timeout and cache BigQueryIOMetadata (#29662)
dd5bbb2d77d is described below

commit dd5bbb2d77d7f3d057baad3d872677671c730345
Author: Sam Whittle 
AuthorDate: Tue Jan 16 09:56:10 2024 +0100

Add a read timeout and cache BigQueryIOMetadata (#29662)
---
 .../sdk/extensions/gcp/util/GceMetadataUtil.java   |  1 +
 .../sdk/io/gcp/bigquery/BigQueryIOMetadata.java| 38 --
 2 files changed, 21 insertions(+), 18 deletions(-)

diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java
index fd49b759fd6..e63aa7dc677 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java
@@ -44,6 +44,7 @@ public class GceMetadataUtil {
 int timeoutMillis = 5000;
 final HttpParams httpParams = new BasicHttpParams();
 HttpConnectionParams.setConnectionTimeout(httpParams, timeoutMillis);
+HttpConnectionParams.setSoTimeout(httpParams, timeoutMillis);
 String ret = "";
 try {
   HttpClient client = new DefaultHttpClient(httpParams);
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadata.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadata.java
index 9cce436fe35..f8d261d3bf6 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadata.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadata.java
@@ -18,19 +18,25 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import org.apache.beam.sdk.extensions.gcp.util.GceMetadataUtil;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Metadata class for BigQueryIO. i.e. to use as BQ job labels. */
 final class BigQueryIOMetadata {
 
-  private @Nullable String beamJobId;
+  private final @Nullable String beamJobId;
 
-  private @Nullable String beamJobName;
+  private final @Nullable String beamJobName;
 
-  private @Nullable String beamWorkerId;
+  private final @Nullable String beamWorkerId;
+
+  static final Supplier INSTANCE =
+  Suppliers.memoizeWithExpiration(() -> refreshInstance(), 5, 
TimeUnit.MINUTES);
 
   private BigQueryIOMetadata(
   @Nullable String beamJobId, @Nullable String beamJobName, @Nullable 
String beamWorkerId) {
@@ -47,25 +53,21 @@ final class BigQueryIOMetadata {
* being used.
*/
   public static BigQueryIOMetadata create() {
-String dataflowJobId = GceMetadataUtil.fetchDataflowJobId();
-String dataflowJobName = GceMetadataUtil.fetchDataflowJobName();
-String dataflowWorkerId = GceMetadataUtil.fetchDataflowWorkerId();
+return INSTANCE.get();
+  }
 
+  private static BigQueryIOMetadata refreshInstance() {
+String dataflowJobId = GceMetadataUtil.fetchDataflowJobId();
 // If a Dataflow job id is returned on GCE metadata. Then it means
 // this program is running on a Dataflow GCE VM.
-boolean isDataflowRunner = !dataflowJobId.isEmpty();
-
-String beamJobId = null;
-String beamJobName = null;
-String beamWorkerId = null;
-if (isDataflowRunner) {
-  if (BigQueryIOMetadata.isValidCloudLabel(dataflowJobId)) {
-beamJobId = dataflowJobId;
-beamJobName = dataflowJobName;
-beamWorkerId = dataflowWorkerId;
-  }
+if (dataflowJobId.isEmpty() || 
!BigQueryIOMetadata.isValidCloudLabel(dataflowJobId)) {
+  return new BigQueryIOMetadata(null, null, null);
 }
-return new BigQueryIOMetadata(beamJobId, beamJobName, beamWorkerId);
+
+return new BigQueryIOMetadata(
+dataflowJobId,
+GceMetadataUtil.fetchDataflowJobName(),
+GceMetadataUtil.fetchDataflowWorkerId());
   }
 
   public Map addAdditionalJobLabels(Map 
jobLabels) {



(beam) branch master updated: [Dataflow Streaming] Make SideInputCache bytes and expiry configurable (#29871)

2024-01-09 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new b666c6434dc [Dataflow Streaming] Make SideInputCache bytes and expiry 
configurable (#29871)
b666c6434dc is described below

commit b666c6434dc6e15079b10bdaf1d6a88b530bb714
Author: Arun Pandian 
AuthorDate: Tue Jan 9 03:40:52 2024 -0800

[Dataflow Streaming] Make SideInputCache bytes and expiry configurable 
(#29871)

Co-authored-by: Arun Pandian 
---
 .../dataflow/options/DataflowPipelineDebugOptions.java   | 16 ++--
 .../runners/dataflow/worker/StreamingDataflowWorker.java |  2 +-
 .../worker/streaming/sideinput/SideInputCache.java   | 11 ++-
 .../streaming/sideinput/SideInputStateFetcher.java   |  6 --
 .../streaming/sideinput/SideInputStateFetcherTest.java   | 14 +++---
 5 files changed, 36 insertions(+), 13 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
index f8649a1f0f3..290418bd1cb 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
@@ -239,8 +239,8 @@ public interface DataflowPipelineDebugOptions
   /**
* The size of the worker's in-memory cache, in megabytes.
*
-   * Currently, this cache is used for storing read values of side inputs. 
as well as the state
-   * for streaming jobs.
+   * Currently, this cache is used for storing read values of side inputs 
in batch as well as the
+   * user state for streaming jobs.
*/
   @Description("The size of the worker's in-memory cache, in megabytes.")
   @Default.Integer(100)
@@ -248,6 +248,18 @@ public interface DataflowPipelineDebugOptions
 
   void setWorkerCacheMb(Integer value);
 
+  @Description("The size of the streaming worker's side input cache, in 
megabytes.")
+  @Default.Integer(100)
+  Integer getStreamingSideInputCacheMb();
+
+  void setStreamingSideInputCacheMb(Integer value);
+
+  @Description("The expiry for streaming worker's side input cache entries, in 
milliseconds.")
+  @Default.Integer(60 * 1000) // 1 minute
+  Integer getStreamingSideInputCacheExpirationMillis();
+
+  void setstreamingSideInputCacheExpirationMillis(Integer value);
+
   /**
* The amount of time before UnboundedReaders are considered idle and closed 
during streaming
* execution.
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 0ddafa25f86..f68e5ba26c7 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -424,7 +424,7 @@ public class StreamingDataflowWorker {
 this.metricTrackingWindmillServer =
 new MetricTrackingWindmillServerStub(windmillServer, memoryMonitor, 
windmillServiceEnabled);
 this.metricTrackingWindmillServer.start();
-this.sideInputStateFetcher = new 
SideInputStateFetcher(metricTrackingWindmillServer);
+this.sideInputStateFetcher = new 
SideInputStateFetcher(metricTrackingWindmillServer, options);
 this.clientId = clientIdGenerator.nextLong();
 
 for (MapTask mapTask : mapTasks) {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputCache.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputCache.java
index 721c477435e..beb7c361d95 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputCache.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputCache.java
@@ -23,6 +23,7 @@ import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TypeDescri

(beam) branch master updated: Change pubsub message cap size from 10MiB to 10MB (#29791)

2023-12-21 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 7e4dc66ba20 Change pubsub message cap size from 10MiB to 10MB (#29791)
7e4dc66ba20 is described below

commit 7e4dc66ba208a8bc53c03f52945b95713b74fbaf
Author: Arun Pandian 
AuthorDate: Thu Dec 21 09:49:45 2023 -0800

Change pubsub message cap size from 10MiB to 10MB (#29791)

Pubsub's max allowed message size is 10million bytes and not 10 * 2^20
bytes https://cloud.google.com/pubsub/quotas

Co-authored-by: Arun Pandian 
---
 .../src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 2 +-
 sdks/python/apache_beam/io/gcp/pubsub.py  | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index f79299aea5f..e281e559a54 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -194,7 +194,7 @@ public class PubsubIO {
 
   private static final Pattern PUBSUB_NAME_REGEXP = 
Pattern.compile("[a-zA-Z][-._~%+a-zA-Z0-9]+");
 
-  static final int PUBSUB_MESSAGE_MAX_TOTAL_SIZE = 10 << 20;
+  static final int PUBSUB_MESSAGE_MAX_TOTAL_SIZE = 10_000_000;
 
   private static final int PUBSUB_NAME_MIN_LENGTH = 3;
   private static final int PUBSUB_NAME_MAX_LENGTH = 255;
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py 
b/sdks/python/apache_beam/io/gcp/pubsub.py
index af58006d6e7..d0785fa1f21 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -150,7 +150,7 @@ class PubsubMessage(object):
   containing the payload of this object.
 """
 msg = pubsub.types.PubsubMessage()
-if len(self.data) > (10 << 20):
+if len(self.data) > (10_000_000):
   raise ValueError('A pubsub message data field must not exceed 10MB')
 msg.data = self.data
 
@@ -179,7 +179,7 @@ class PubsubMessage(object):
 msg.ordering_key = self.ordering_key
 
 serialized = pubsub.types.PubsubMessage.serialize(msg)
-if len(serialized) > (10 << 20):
+if len(serialized) > (10_000_000):
   raise ValueError(
   'Serialized pubsub message exceeds the publish request limit of 
10MB')
 return serialized



(beam) branch master updated (6a12e8b88df -> 61c53676a7b)

2023-12-21 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


from 6a12e8b88df fix contructor names (#29855)
 add 61c53676a7b Dataflow Streaming: Add a pipeline option 
`--desiredNumUnboundedSourceSplits` to overide the desired number of splits for 
CustomSources (#29704)

No new revisions were added by this update.

Summary of changes:
 .../apache/beam/runners/dataflow/internal/CustomSources.java   |  4 
 .../runners/dataflow/options/DataflowPipelineDebugOptions.java | 10 ++
 2 files changed, 14 insertions(+)



(beam) branch master updated: Add interfaces for direct path, and StreamingEngineClient (#28835)

2023-11-30 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 93fb2045ea1 Add interfaces for direct path, and StreamingEngineClient 
(#28835)
93fb2045ea1 is described below

commit 93fb2045ea1ddf42e5da49048302390a2db6e492
Author: martin trieu 
AuthorDate: Thu Nov 30 02:19:13 2023 -0800

Add interfaces for direct path, and StreamingEngineClient (#28835)

Add interfaces/classes for direct path:
ProcessWorkItemClient
Exposed to WorkItemProcessor to give access to route GetData and CommitWork 
stream RPCs to the same workers where GetWork was called (currently 
StreamingDataflowWorker#process).

WorkItemProcessor
Replaces WorkItemReceiver, same but takes and exposes ProcessWorkItemClient 
instead of WorkItem. Since ProcessWorkItemClient needs a way to get data for 
work, refresh work, get side input data, and commit work, the place where its 
created (GrpcGetWorkStream) needs to be modified to accept GetDataStream (for 
keyed/state data), GetDataStream (global side input data), and a 
CommitWorkStream.

GetWorkBudget
A struct to model item and byte budgets for how much work a user worker can 
handle. This is passed in GetWorkRequest(s) to Windmill to control how many 
items/bytes of Work is returned.

GetWorkBudgetDistributor
Given a set of WindmillStreamSender(s) and GetWorkBudget, distributes the 
budgets to the WindmillStreamSender(s) in some manner.

EvenGetWorkBudgetDistributor
GetWorkBudgetDistributor implementation that distributes the budget evenly

WindmillStreamSender
When the Grpc*Stream(s) are created, they immediately start the underlying 
grpc stream (startStream is called, and has protected access). To be able to 
assign budgets and get the streams ready to be started (similar to 
GetWorkClientSender), WindmillStreamSender wraps the 3 WorkItem API RPC 
streams, and exposes a startStream, and closeAllStreams to manage the 
underlying streams. Once the streams are started they are cached (via thread 
safe memoization). Once certain endpoints are stale [...]

DispatcherClient
Manages/vends out stubs and the dispatcher
Thread safe via synchronization on reads and writes.

Add StreamingEngineClient

Manages the available backend Windmill workers via GetWorkerMetadata. We 
never close this stream. WorkerMetadata updates are then submitted to a single 
threaded executor which will consume it, and update StreamingEngineClient 
internal connections state
Given a total budget, divides it amongst the available backend Windmill 
workers (represented as Endpoints, Connections, and WindmillStreamSenders) 
starts GetWorkStream(s). Closes streams via 
WindmillStreamSender#closeAllStreams when the endpoint for the stream is not 
available in updated worker metadata.
Contains single threaded executor for triggered budget refreshes. Budget 
refreshes are triggered when new worker metadata is consumed (implemented), 
work has completed processing (either has been committed back to windmill or 
put in an un-active state). Uses a SynchronousQueue to implement a 
publish/subscribe pattern. put blocks until another thread take(s) from the 
queue.
Contains single threaded executor for periodic budget refreshes.

Future changes need still:
Have GrpcGetWorkStream accept a GrpcGetDataStream and GrpcCommitWorkStream 
so that it can construct a ProcessWorkItemClient and pass it onto the 
ProcessWorkItem (replaces current behavior whereWorkItem being passed to the 
WorkItemReceiver).
Integrate with StreamingDataflowWorker, might be worth having 2 different 
implementations of StreamingDataflowWorker since current 
MetricTrackingWindmillServer is used for GetData (keyed and global) fetches.
Need to figure out a way to batch commits since they need to go to the same 
origin worker
---
 .../worker/windmill/WindmillConnection.java|  57 +++
 .../client/grpc/GetWorkTimingInfosTracker.java |  10 +-
 .../windmill/client/grpc/GrpcCommitWorkStream.java |   4 +-
 .../client/grpc/GrpcDirectGetWorkStream.java   | 320 
 .../windmill/client/grpc/GrpcGetDataStream.java|   4 +-
 .../client/grpc/GrpcGetWorkerMetadataStream.java   |   3 +-
 .../client/grpc/GrpcWindmillStreamFactory.java |  28 +-
 .../client/grpc/StreamingEngineClient.java | 401 
 .../grpc/StreamingEngineConnectionState.java   |  64 
 .../windmill/client/grpc/WindmillStreamSender.java | 156 
 .../windmill/work/ProcessWorkItemClient.java   |  52 +++
 .../worker/windmill/work/WorkItemProcessor.java|  57 +++
 .../work/budget/EvenGetWorkBudgetDistributor.java  | 101 +
 .../worker/windmill/work/budget/GetWorkBudget.java |  35 +-
 .../work/budget

(beam) branch master updated: Change StateBackedIterable to implement ElementByteSizeObservableIterable avoiding iteration to estimate observe bytes. (#29517)

2023-11-23 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 6ce104769c4 Change StateBackedIterable to implement 
ElementByteSizeObservableIterable avoiding iteration to estimate observe bytes. 
(#29517)
6ce104769c4 is described below

commit 6ce104769c45b56a01760f8e6574e2290cd7c4e8
Author: Sam Whittle 
AuthorDate: Thu Nov 23 12:13:05 2023 +0100

Change StateBackedIterable to implement ElementByteSizeObservableIterable 
avoiding iteration to estimate observe bytes. (#29517)

* Change StateBackedIterable to implement ElementByteSizeObservableIterable 
reducing byte estimation costs.
---
 .../beam/fn/harness/state/StateBackedIterable.java | 87 +-
 .../fn/harness/state/StateBackedIterableTest.java  | 58 +++
 2 files changed, 142 insertions(+), 3 deletions(-)

diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java
index 9c95e9ad90e..22e0822b619 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java
@@ -43,12 +43,17 @@ import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
 import org.apache.beam.sdk.fn.stream.PrefetchableIterators;
 import org.apache.beam.sdk.util.BufferedElementCountingOutputStream;
 import org.apache.beam.sdk.util.VarInt;
+import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterable;
+import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterator;
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A {@link BeamFnStateClient state} backed iterable which allows for fetching 
elements over the
@@ -62,12 +67,17 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams
 @SuppressWarnings({
   "rawtypes" // TODO(https://github.com/apache/beam/issues/20447)
 })
-public class StateBackedIterable implements Iterable, Serializable {
+public class StateBackedIterable
+extends ElementByteSizeObservableIterable>
+implements Serializable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StateBackedIterable.class);
 
   @VisibleForTesting final StateRequest request;
   @VisibleForTesting final List prefix;
   private final transient PrefetchableIterable suffix;
 
+  private final org.apache.beam.sdk.coders.Coder elemCoder;
+
   public StateBackedIterable(
   Cache cache,
   BeamFnStateClient beamFnStateClient,
@@ -81,11 +91,82 @@ public class StateBackedIterable implements Iterable, 
Serializable {
 this.suffix =
 StateFetchingIterators.readAllAndDecodeStartingFrom(
 Caches.subCache(cache, stateKey), beamFnStateClient, request, 
elemCoder);
+this.elemCoder = elemCoder;
+  }
+
+  @SuppressWarnings("nullness")
+  private static class WrappedObservingIterator extends 
ElementByteSizeObservableIterator {
+private final Iterator wrappedIterator;
+private final org.apache.beam.sdk.coders.Coder elementCoder;
+
+// Logically final and non-null but initialized after construction by 
factory method for
+// initialization ordering.
+private ElementByteSizeObserver observerProxy = null;
+
+private boolean observerNeedsAdvance = false;
+private boolean exceptionLogged = false;
+
+static  WrappedObservingIterator create(
+Iterator iterator, org.apache.beam.sdk.coders.Coder 
elementCoder) {
+  WrappedObservingIterator result = new 
WrappedObservingIterator<>(iterator, elementCoder);
+  result.observerProxy =
+  new ElementByteSizeObserver() {
+@Override
+protected void reportElementSize(long elementByteSize) {
+  result.notifyValueReturned(elementByteSize);
+}
+  };
+  return result;
+}
+
+private WrappedObservingIterator(
+Iterator iterator, org.apache.beam.sdk.coders.Coder 
elementCoder) {
+  this.wrappedIterator = iterator;
+  this.elementCoder = elementCoder;
+}
+
+@Override
+public boolean hasNext() {
+ 

(beam) branch master updated: organize and refactor GrpcWindmillServer. (#29156)

2023-10-31 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new ba714221d5e organize and refactor GrpcWindmillServer. (#29156)
ba714221d5e is described below

commit ba714221d5efaea58a59bccaad2eaeef70bd4ec4
Author: martin trieu 
AuthorDate: Tue Oct 31 02:37:10 2023 -0700

organize and refactor GrpcWindmillServer. (#29156)

organize and refactor GrpcWindmillServer to prepare for Streaming Engine 
Client changes.
---
 .../google-cloud-dataflow-java/worker/build.gradle |   3 +
 .../worker/MetricTrackingWindmillServerStub.java   |   4 +-
 .../dataflow/worker/StreamingDataflowWorker.java   |   6 +-
 .../options/StreamingDataflowWorkerOptions.java|   2 +-
 .../worker/windmill/WindmillServerBase.java|   8 +-
 .../worker/windmill/WindmillServerStub.java|   8 +-
 .../{ => client}/AbstractWindmillStream.java   |   3 +-
 .../windmill/{ => client}/WindmillStream.java  |  21 +-
 .../windmill/{ => client}/WindmillStreamPool.java  |   2 +-
 .../grpc}/AppendableInputStream.java   |   2 +-
 .../grpc}/GetWorkTimingInfosTracker.java   |   2 +-
 .../grpc}/GrpcCommitWorkStream.java|   9 +-
 .../grpc}/GrpcDeadlineClientInterceptor.java   |   2 +-
 .../windmill/client/grpc/GrpcDispatcherClient.java | 136 +
 .../grpc}/GrpcGetDataStream.java   |  13 +-
 .../grpc}/GrpcGetDataStreamRequests.java   |   2 +-
 .../grpc}/GrpcGetWorkStream.java   |  29 +-
 .../grpc}/GrpcGetWorkerMetadataStream.java |   9 +-
 .../windmill/client/grpc/GrpcWindmillServer.java   | 355 
 .../client/grpc/GrpcWindmillStreamFactory.java | 227 
 .../grpc/auth/VendoredCredentialsAdapter.java  |  81 +++
 .../VendoredRequestMetadataCallbackAdapter.java|  51 ++
 .../grpc/observers}/DirectStreamObserver.java  |   2 +-
 .../ForwardingClientResponseObserver.java  |   2 +-
 .../grpc/observers}/StreamObserverFactory.java |   2 +-
 .../client/grpc/stubs/WindmillChannelFactory.java  | 137 +
 .../client/grpc/stubs/WindmillStubFactory.java |  73 +++
 .../throttling/StreamingEngineThrottleTimers.java  |  41 ++
 .../throttling}/ThrottleTimer.java |   6 +-
 .../windmill/grpcclient/GrpcWindmillServer.java| 607 -
 .../worker/windmill/work/WorkItemReceiver.java |  34 ++
 .../worker/windmill/work/budget/GetWorkBudget.java |  98 
 .../dataflow/worker/FakeWindmillServer.java|  24 +-
 .../{ => client}/WindmillStreamPoolTest.java   |   2 +-
 .../grpc}/GrpcGetWorkerMetadataStreamTest.java |   9 +-
 .../grpc}/GrpcWindmillServerTest.java  |  15 +-
 .../windmill/work/budget/GetWorkBudgetTest.java|  72 +++
 37 files changed, 1413 insertions(+), 686 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/build.gradle 
b/runners/google-cloud-dataflow-java/worker/build.gradle
index ce06063c9b5..1ca9eba2b48 100644
--- a/runners/google-cloud-dataflow-java/worker/build.gradle
+++ b/runners/google-cloud-dataflow-java/worker/build.gradle
@@ -89,6 +89,9 @@ applyJavaNature(
 // Allow slf4j implementation worker for logging during 
pipeline execution
 "org/slf4j/impl/**"
 ],
+generatedClassPatterns: [
+/^org\.apache\.beam\.runners\.dataflow\.worker\.windmill.*/
+],
 shadowClosure: {
 // Each included dependency must also include all of its necessary 
transitive dependencies
 // or have them provided by the users pipeline during job 
submission. Typically a users
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java
index 33b55647213..0e929249b3a 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java
@@ -29,8 +29,8 @@ import 
org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest;
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
-import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream;
-import org.apache.beam.runners.dataflow.worker.windmill.WindmillStreamPool;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.

[beam] branch master updated: Refactor StateFetcher (#28755)

2023-10-20 Thread scwhittle
This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new c6ab5fe272f Refactor StateFetcher (#28755)
c6ab5fe272f is described below

commit c6ab5fe272f74749713a5ccf7c21a398d017a606
Author: martin trieu 
AuthorDate: Fri Oct 20 05:20:37 2023 -0700

Refactor StateFetcher (#28755)

Refactor and cleanup of StateFetcher in preparation for future changes
---
 .../beam/runners/dataflow/worker/StateFetcher.java | 291 -
 .../dataflow/worker/StreamingDataflowWorker.java   |  11 +-
 .../worker/StreamingModeExecutionContext.java  | 114 
 .../dataflow/worker/StreamingSideInputFetcher.java |   8 +-
 .../worker/streaming/sideinput/SideInput.java  |  50 
 .../worker/streaming/sideinput/SideInputCache.java | 113 
 .../worker/streaming/sideinput/SideInputState.java |  25 ++
 .../streaming/sideinput/SideInputStateFetcher.java | 245 +
 .../worker/StreamingDataflowWorkerTest.java|   6 +-
 .../worker/StreamingModeExecutionContextTest.java  |   9 +-
 .../worker/StreamingSideInputDoFnRunnerTest.java   |   2 +-
 .../worker/StreamingSideInputFetcherTest.java  |   2 +-
 .../sideinput/SideInputStateFetcherTest.java}  | 170 +++-
 13 files changed, 626 insertions(+), 420 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java
deleted file mode 100644
index 0cbcd2e8301..000
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker;
-
-import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
-
-import java.io.Closeable;
-import java.util.Collections;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import org.apache.beam.runners.core.InMemoryMultimapSideInputView;
-import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.Materializations;
-import org.apache.beam.sdk.transforms.Materializations.IterableView;
-import org.apache.beam.sdk.transforms.Materializations.MultimapView;
-import org.apache.beam.sdk.transforms.ViewFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.ByteStringOutputStream;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Optional;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Weigher;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
-import org.checkerframework.checker.nullness.qual.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** Class responsible for fetching state from the windmill server. */
-@SuppressWarnings({
-  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
-class StateFetcher {
-  private static final Set SUPPORTED_MATERIALIZATIONS =
-  ImmutableSet.of(
-  Materializations.ITERABLE_MATERIALIZATION_URN,
-  Materializations.MULTIMAP_MATERIALIZATION_UR