arunpandianp commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1814402394


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -69,7 +69,9 @@ final class GrpcGetDataStream
   private static final StreamingGetDataRequest HEALTH_CHECK_REQUEST =
       StreamingGetDataRequest.newBuilder().build();
 
+  /** @implNote insertion and removal is guarded by {@link #shutdownLock} */

Review Comment:
   ```suggestion
     /** @implNote QueuedBatch objects in the queue are guarded by {@link 
#shutdownLock} */
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -363,7 +363,7 @@ private void queueRequestAndWait(QueuedRequest request) 
throws InterruptedExcept
     QueuedBatch batch;
     boolean responsibleForSend = false;
     @Nullable QueuedBatch prevBatch = null;
-    synchronized (batches) {
+    synchronized (shutdownLock) {
       batch = batches.isEmpty() ? null : batches.getLast();

Review Comment:
   We need to check and return early based on isShutdown() before accessing 
`batches` in all synchronized blocks guarding batches, else it'll still race 
with shutdown.
   
   Eg problematic sequence:
   1: Thread1 is on line 366 waiting to lock shutdownLock
   2. Thread2 executes shutdown logic, clears all the pending batches.
   3. Thread1 now gets shutdownLock here and will try to create a new batch. 



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -225,123 +225,111 @@ private void flushInternal(Map<Long, PendingRequest> 
requests) {
   }
 
   private void issueSingleRequest(long id, PendingRequest pendingRequest) {
-    if (isPrepareForSendFailed(id, pendingRequest)) {
+    if (prepareForSend(id, pendingRequest)) {

Review Comment:
   ```suggestion
       if (!prepareForSend(id, pendingRequest)) {
           pendingRequest.abort();
        }
   ```
   
   This reduces nesting.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to