gemini-code-assist[bot] commented on code in PR #38458:
URL: https://github.com/apache/beam/pull/38458#discussion_r3224857921
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -651,6 +653,17 @@ public static StreamingDataflowWorker
fromOptions(DataflowWorkerHarnessOptions o
windmillStateCache::forComputation,
ID_GENERATOR));
+ Fetcher configedFetcher =
configFetcherComputationStateCacheAndWindmillClient.configFetcher();
+ configedFetcher
Review Comment:

The variable name `configedFetcher` contains a typo. "Configed" is not a
standard English word; `fetcher` or `configuredFetcher` would be more
appropriate.
```suggestion
Fetcher fetcher =
configFetcherComputationStateCacheAndWindmillClient.configFetcher();
fetcher
```
##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java:
##########
@@ -249,6 +249,16 @@ public interface DataflowStreamingPipelineOptions extends
PipelineOptions {
void setIsWindmillServiceDirectPathEnabled(boolean
isWindmillServiceDirectPathEnabled);
+ /**
+ * The maximum size of cached values in bytes. Values larger than this limit
will not be cached by
+ * the windmill state cache
+ */
+ @Description("The maximum size of cached values in bytes.")
Review Comment:

The description "maximum size of cached values" is slightly ambiguous
because the implementation checks against the entry's weight, which includes
cache overhead (e.g., approximately 136 bytes per entry). It would be clearer
to specify that this limit includes overhead so users can set appropriate
values.
```suggestion
/**
* The maximum size of cached values in bytes (including cache overhead).
Values larger than this
* limit will not be cached by the windmill state cache.
*/
@Description("The maximum size of cached values in bytes (including cache
overhead).")
```
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -689,7 +702,7 @@ public static StreamingDataflowWorker
fromOptions(DataflowWorkerHarnessOptions o
return new StreamingDataflowWorker(
windmillServer,
clientId,
- configFetcherComputationStateCacheAndWindmillClient.configFetcher(),
+ configedFetcher,
Review Comment:

Updating the variable name to match the suggested change for the fetcher
definition.
```suggestion
fetcher,
```
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -651,6 +653,17 @@ public static StreamingDataflowWorker
fromOptions(DataflowWorkerHarnessOptions o
windmillStateCache::forComputation,
ID_GENERATOR));
+ Fetcher configedFetcher =
configFetcherComputationStateCacheAndWindmillClient.configFetcher();
+ configedFetcher
+ .getGlobalConfigHandle()
+ .registerConfigObserver(
+ config -> {
+ if (config.userWorkerJobSettings().hasMaxCachedValueBytes()) {
+ windmillStateCache.setMaxCachedValueBytesOverride(
+ config.userWorkerJobSettings().getMaxCachedValueBytes());
+ }
+ });
Review Comment:

The configuration observer only updates the override when the field is
present in the dynamic configuration. If the setting is subsequently removed
from the dynamic configuration, the worker will continue to use the last
received override instead of reverting to the default value from the pipeline
options. Adding an `else` block to reset the override to `-1L` ensures the
worker correctly handles the removal of the dynamic setting.
```suggestion
config -> {
if (config.userWorkerJobSettings().hasMaxCachedValueBytes()) {
windmillStateCache.setMaxCachedValueBytesOverride(
config.userWorkerJobSettings().getMaxCachedValueBytes());
} else {
windmillStateCache.setMaxCachedValueBytesOverride(-1L);
}
});
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]