acrites commented on code in PR #31822: URL: https://github.com/apache/beam/pull/31822#discussion_r1690391609
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -572,10 +597,32 @@ static StreamingDataflowWorker forTesting( private static void onPipelineConfig( StreamingEnginePipelineConfig config, Consumer<ImmutableSet<HostAndPort>> consumeWindmillServiceEndpoints, - AtomicInteger maxWorkItemCommitBytes) { - if (config.maxWorkItemCommitBytes() != maxWorkItemCommitBytes.get()) { - LOG.info("Setting maxWorkItemCommitBytes to {}", maxWorkItemCommitBytes); - maxWorkItemCommitBytes.set((int) config.maxWorkItemCommitBytes()); + AtomicReference<OperationalLimits> operationalLimits) { + if (config.maxWorkItemCommitBytes() != operationalLimits.get().maxWorkItemCommitBytes) { Review Comment: Let me know if this is what you intended. I'm not just setting the `OperationalLimits` based on whatever was set in the `StreamingEnginePipelineConfig`. Any values not explicitly set by the caller will use what ever the config's default values are. So at least in this block of code, the `OperationalLimit`'s default values never come into play. The `StreamingEnginePipelineConfig` isn't a proto, so I can't tell if a value was explicitly set or not (there's no `hasFoo()` methods). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org