gemini-code-assist[bot] commented on code in PR #38919:
URL: https://github.com/apache/beam/pull/38919#discussion_r3395770317


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -553,7 +700,130 @@ public Map<Long, Pair<Instant, Runnable>> flushState() {
       // RestrictionTracker.getProgress() or GetSize() are not defined.
       getOutputBuilder().setSourceBacklogBytes(backlogBytes);
     }
-    return callbacks;
+
+    this.accumulatedCallbacks.putAll(callbacks);
+
+    getOutputBuilder()
+        
.setSourceBytesProcessed(computeSourceBytesProcessed(sourceBytesProcessCounterName));
+  }
+
+  private final long computeSourceBytesProcessed(String 
sourceBytesCounterName) {
+    if (!(workExecutor instanceof DataflowMapTaskExecutor)) {
+      return 0L;
+    }
+    HashMap<String, ElementCounter> counters =
+        ((DataflowMapTaskExecutor) workExecutor)
+            .getReadOperation()
+            .receivers[0]
+            .getOutputCounters();

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Defensive programming: Accessing `receivers[0]` directly without checking if 
`getReadOperation()` is null, or if `receivers` is null or empty, can lead to 
`NullPointerException` or `ArrayIndexOutOfBoundsException`. Adding appropriate 
guards ensures robust execution.
   
   ```java
       DataflowMapTaskExecutor mapTaskExecutor = (DataflowMapTaskExecutor) 
workExecutor;
       if (mapTaskExecutor.getReadOperation() == null
           || mapTaskExecutor.getReadOperation().receivers == null
           || mapTaskExecutor.getReadOperation().receivers.length == 0) {
         return 0L;
       }
       HashMap<String, ElementCounter> counters =
           mapTaskExecutor.getReadOperation().receivers[0].getOutputCounters();
       if (counters == null) {
         return 0L;
       }
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -185,6 +242,22 @@ public StreamingModeExecutionContext(
     this.stateCache = stateCache;
     this.backlogBytes = UnboundedReader.BACKLOG_UNKNOWN;
     this.throwExceptionOnLargeOutput = throwExceptionOnLargeOutput;
+    this.hotKeyLogger = checkNotNull(hotKeyLogger);
+    this.hotKeyLoggingEnabled = hotKeyLoggingEnabled;
+    this.stepName = checkNotNull(stepName);
+    this.sourceBytesProcessCounterName = 
checkNotNull(sourceBytesProcessCounterName);
+    this.sideInputStateFetcherFactory = 
checkNotNull(sideInputStateFetcherFactory);
+
+    // Initialize batch limits from pipeline options
+    String batchSizeStr =
+        ExperimentalOptions.getExperimentValue(options, 
WINDMILL_MAX_KEY_GROUP_BATCH_SIZE);
+    this.maxKeyGroupBatchSize = batchSizeStr != null ? 
Integer.parseInt(batchSizeStr) : 100;
+
+    String batchTimeStr =
+        ExperimentalOptions.getExperimentValue(options, 
WINDMILL_MAX_KEY_GROUP_BATCH_TIME_MS);
+    this.maxKeyGroupBatchTimeNanos =
+        TimeUnit.MILLISECONDS.toNanos(batchTimeStr != null ? 
Long.parseLong(batchTimeStr) : 100);

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Parsing user-provided experimental options directly using `Integer.parseInt` 
and `Long.parseLong` without error handling can cause the worker to crash if 
the values are malformed. It is safer to wrap these in a try-catch block and 
fall back to the default values with a warning log.
   
   ```java
       String batchSizeStr =
           ExperimentalOptions.getExperimentValue(options, 
WINDMILL_MAX_KEY_GROUP_BATCH_SIZE);
       int batchSize = 100;
       if (batchSizeStr != null) {
         try {
           batchSize = Integer.parseInt(batchSizeStr);
         } catch (NumberFormatException e) {
           LOG.warn("Failed to parse {} as integer, using default of 100", 
WINDMILL_MAX_KEY_GROUP_BATCH_SIZE, e);
         }
       }
       this.maxKeyGroupBatchSize = batchSize;
   
       String batchTimeStr =
           ExperimentalOptions.getExperimentValue(options, 
WINDMILL_MAX_KEY_GROUP_BATCH_TIME_MS);
       long batchTimeMs = 100;
       if (batchTimeStr != null) {
         try {
           batchTimeMs = Long.parseLong(batchTimeStr);
         } catch (NumberFormatException e) {
           LOG.warn("Failed to parse {} as long, using default of 100", 
WINDMILL_MAX_KEY_GROUP_BATCH_TIME_MS, e);
         }
       }
       this.maxKeyGroupBatchTimeNanos = 
TimeUnit.MILLISECONDS.toNanos(batchTimeMs);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to