gemini-code-assist[bot] commented on code in PR #38863:
URL: https://github.com/apache/beam/pull/38863#discussion_r3412187796


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java:
##########
@@ -166,7 +191,7 @@ private void flushInternal() {
     }
     Elements.Builder elements = convertBufferForTransmission();
     if (elements.getDataCount() > 0 || elements.getTimersCount() > 0) {
-      outboundObserver.onNext(elements.build());
+      checkNotNull(outboundObserver).onNext(elements.build());
     }
     hasFlushedForBundle = true;

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   If `discard()` or `finishInstruction()` is called, `instructionId` and 
`outboundObserver` are set to `null`. If the periodic flushing thread runs 
concurrently or is scheduled to run just after, `flushInternal()` will be 
called. It will attempt to call `convertBufferForTransmission()` (which calls 
`setInstructionId(instructionId)`, throwing NPE if `instructionId` is null) or 
`checkNotNull(outboundObserver)`, throwing NPE. Adding an early return if 
either is null prevents these exceptions.
   
   ```java
       if (instructionId == null || outboundObserver == null) {
         return;
       }
       Elements.Builder elements = convertBufferForTransmission();
       if (elements.getDataCount() > 0 || elements.getTimersCount() > 0) {
         outboundObserver.onNext(elements.build());
       }
       hasFlushedForBundle = true;
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java:
##########
@@ -44,17 +46,42 @@ public class BeamFnDataGrpcClient implements 
BeamFnDataClient {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(BeamFnDataGrpcClient.class);
 
-  private final ConcurrentMap<Endpoints.ApiServiceDescriptor, 
BeamFnDataGrpcMultiplexer>
-      multiplexerCache;
+  private static class MultiplexerKey {
+    private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
+    private final String dataStreamId;
+
+    private MultiplexerKey(
+        Endpoints.ApiServiceDescriptor apiServiceDescriptor, String 
dataStreamId) {
+      this.apiServiceDescriptor = apiServiceDescriptor;
+      this.dataStreamId = dataStreamId;
+    }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   In `MultiplexerKey`, `dataStreamId` is compared using `Objects.equals`. If 
one caller passes `null` and another passes `""` (empty string), they will be 
treated as different keys, resulting in duplicate multiplexers for the same 
default stream. Normalizing `dataStreamId` to `""` if it is null or empty 
ensures consistent key lookup.
   
   ```suggestion
       private MultiplexerKey(
           Endpoints.ApiServiceDescriptor apiServiceDescriptor, @Nullable 
String dataStreamId) {
         this.apiServiceDescriptor = apiServiceDescriptor;
         this.dataStreamId = dataStreamId == null ? "" : dataStreamId;
       }
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java:
##########
@@ -179,12 +187,13 @@ public static void main(Function<String, String> 
environmentVarGetter) throws Ex
     } catch (Exception e) {
       System.out.format("Problem loading pipeline options from file: %s%n", 
e.getMessage());
     }
-
-    System.out.format("Pipeline options %s%n", pipelineOptionsJson);
-    // TODO: https://github.com/apache/beam/issues/30301
-    pipelineOptionsJson = removeNestedKey(pipelineOptionsJson, 
"impersonateServiceAccount");
-
-    PipelineOptions options = 
PipelineOptionsTranslation.fromJson(pipelineOptionsJson);
+    if (pipelineOptionsJson != null) {
+      System.out.format("Pipeline options %s%n", pipelineOptionsJson);
+      // TODO: https://github.com/apache/beam/issues/30301
+      pipelineOptionsJson = removeNestedKey(pipelineOptionsJson, 
"impersonateServiceAccount");
+    }
+    PipelineOptions options =
+        PipelineOptionsTranslation.fromJson(pipelineOptionsJson == null ? "" : 
pipelineOptionsJson);

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   If `pipelineOptionsJson` is null, passing `""` (empty string) to 
`PipelineOptionsTranslation.fromJson` will likely throw a JSON parsing 
exception because an empty string is not valid JSON. Passing `"{}"` (empty JSON 
object) is the standard way to represent empty options and is safe to parse.
   
   ```suggestion
       PipelineOptions options =
           PipelineOptionsTranslation.fromJson(pipelineOptionsJson == null ? 
"{}" : pipelineOptionsJson);
   ```



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java:
##########
@@ -207,34 +233,43 @@ public Elements 
sendOrCollectBufferedDataAndFinishOutboundStreams() {
       TimerEndpoint timerKey = entry.getKey();
       bufferedElements
           .addTimersBuilder()
-          .setInstructionId(processBundleRequestIdSupplier.get())
+          .setInstructionId(instructionId)
           .setTransformId(timerKey.pTransformId)
           .setTimerFamilyId(timerKey.timerFamilyId)
           .setIsLast(true);
       entry.getValue().resetStats();
     }
+    // This is the end of the bundle so we reset state to prepare for future 
bundles.
     if (collectElementsIfNoFlushes && !hasFlushedForBundle) {
       return bufferedElements.build();
     }
-    outboundObserver.onNext(bufferedElements.build());
-    // This is now at the end of a bundle, so we reset hasFlushedForBundle to 
prepare for new
-    // bundles.
+    checkNotNull(outboundObserver).onNext(bufferedElements.build());
     hasFlushedForBundle = false;
     return null;
   }
 
   // Send the elements to the StreamObserver associated with this aggregator.
   public void sendElements(Elements elements) {
-    outboundObserver.onNext(elements);
+    checkNotNull(outboundObserver).onNext(elements);
   }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   `sendElements` is called by the bundle processing thread and writes directly 
to `outboundObserver` without acquiring `flushLock`. If `timeLimit > 0`, the 
background flushing thread can concurrently call `flushInternal()` and write to 
the same `outboundObserver`. Since gRPC's `StreamObserver` is not thread-safe, 
concurrent calls to `onNext` can cause stream corruption or crashes. 
Synchronizing on `flushLock` when `timeLimit > 0` prevents this race condition.
   
   ```java
     public void sendElements(Elements elements) {
       if (timeLimit > 0) {
         synchronized (flushLock) {
           checkNotNull(outboundObserver).onNext(elements);
         }
       } else {
         checkNotNull(outboundObserver).onNext(elements);
       }
     }
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java:
##########
@@ -91,25 +119,32 @@ public void poisonInstructionId(String instructionId) {
   }
 
   @Override
-  public BeamFnDataOutboundAggregator createOutboundAggregator(
-      ApiServiceDescriptor apiServiceDescriptor,
-      Supplier<String> processBundleRequestIdSupplier,
-      boolean collectElementsIfNoFlushes) {
-    return new BeamFnDataOutboundAggregator(
-        options,
-        processBundleRequestIdSupplier,
-        getClientFor(apiServiceDescriptor).getOutboundObserver(),
-        collectElementsIfNoFlushes);
+  public StreamObserver<Elements> getOutboundObserver(
+      ApiServiceDescriptor apiServiceDescriptor, String dataStreamId) {
+    return getMultiplexer(apiServiceDescriptor, 
dataStreamId).getOutboundObserver();
   }
 
-  private BeamFnDataGrpcMultiplexer getClientFor(
-      Endpoints.ApiServiceDescriptor apiServiceDescriptor) {
+  private BeamFnDataGrpcMultiplexer getMultiplexer(
+      Endpoints.ApiServiceDescriptor apiServiceDescriptor, String 
dataStreamId) {
+    MultiplexerKey key = new MultiplexerKey(apiServiceDescriptor, 
dataStreamId);
     return multiplexerCache.computeIfAbsent(
-        apiServiceDescriptor,
-        (Endpoints.ApiServiceDescriptor descriptor) ->
-            new BeamFnDataGrpcMultiplexer(
-                descriptor,
-                outboundObserverFactory,
-                
BeamFnDataGrpc.newStub(channelFactory.apply(apiServiceDescriptor))::data));
+        key,
+        k -> {
+          OutboundObserverFactory.BasicFactory<Elements, Elements> 
baseOutboundObserverFactory =
+              inboundObserver -> {
+                BeamFnDataGrpc.BeamFnDataStub stub =
+                    
BeamFnDataGrpc.newStub(channelFactory.apply(apiServiceDescriptor));
+                if (dataStreamId != null && !dataStreamId.isEmpty()) {
+                  Metadata headers = new Metadata();
+                  headers.put(
+                      Metadata.Key.of("data_stream_id", 
Metadata.ASCII_STRING_MARSHALLER),
+                      dataStreamId);
+                  stub = 
stub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers));
+                }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   With `dataStreamId` normalized in `MultiplexerKey`, we can simplify the 
check in `getMultiplexer` to just `!key.dataStreamId.isEmpty()`, removing the 
null check and using the normalized key field.
   
   ```suggestion
                   if (!key.dataStreamId.isEmpty()) {
                     Metadata headers = new Metadata();
                     headers.put(
                         Metadata.Key.of("data_stream_id", 
Metadata.ASCII_STRING_MARSHALLER),
                         key.dataStreamId);
                     stub = 
stub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers));
                   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to