junrao commented on code in PR #20285:
URL: https://github.com/apache/kafka/pull/20285#discussion_r2418163965


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java:
##########
@@ -129,6 +148,17 @@ public TopicPartition topicPartition() {
      * Has the request completed?
      */
     public boolean completed() {
-        return this.latch.getCount() == 0L;
+        if (this.latch.getCount() != 0L) {
+            return false;
+        }
+        // are all the dependent results completed?
+        synchronized (dependentResults) {

Review Comment:
   Hmm, this is not quite right. This method is called from 
FutureRecordMetadata. If a batch is split into two batches b1 and b2. For 
records in b1, they don't need to wait for the completion for b2. So, it 
doesn't seem that we should change this method. But it will be useful to 
document the semantic that it doesn't wait for dependent requests.
    
   ```
       @Override
       public boolean isDone() {
           if (nextRecordMetadata != null)
               return nextRecordMetadata.isDone();
           return this.result.completed();
       }
   
   ```



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java:
##########
@@ -70,7 +73,23 @@ public void done() {
     }
 
     /**
-     * Await the completion of this request
+     * Add a dependent ProduceRequestResult that must complete before this 
result is considered complete.
+     * This is used when a batch is split into multiple batches - the original 
batch's result
+     * should not complete until all split batches have completed.
+     *
+     * @param dependentResult The dependent result to wait for
+     */
+    public void addDependentResult(ProduceRequestResult dependentResult) {
+        synchronized (dependentResults) {
+            dependentResults.add(dependentResult);
+        }
+    }
+
+    /**
+     * Await the completion of this request.
+     * Note: This only waits for the local latch and not dependent results.

Review Comment:
   flush() actually calls await() and it needs to wait for all dependent 
requests to complete. Perhaps we could add a separate method like 
awaitAllDependants() and use that in flush().
   ```
       public void awaitFlushCompletion() throws InterruptedException {
           try {
               // Obtain a copy of all of the incomplete 
ProduceRequestResult(s) at the time of the flush.
               // We must be careful not to hold a reference to the 
ProduceBatch(s) so that garbage
               // collection can occur on the contents.
               // The sender will remove ProducerBatch(s) from the original 
incomplete collection.
               for (ProduceRequestResult result : 
this.incomplete.requestResults())
                   result.await();
           } finally {
               this.flushesInProgress.decrementAndGet();
           }
       }
   ```



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java:
##########
@@ -33,6 +35,7 @@ public class ProduceRequestResult {
 
     private final CountDownLatch latch = new CountDownLatch(1);
     private final TopicPartition topicPartition;
+    private final List<ProduceRequestResult> dependentResults = new 
ArrayList<>();

Review Comment:
   This is an existing issue. There is a typo "record set was sent was sent".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to