junrao commented on code in PR #20285:
URL: https://github.com/apache/kafka/pull/20285#discussion_r2421417011


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java:
##########
@@ -129,6 +148,17 @@ public TopicPartition topicPartition() {
      * Has the request completed?
      */
     public boolean completed() {
-        return this.latch.getCount() == 0L;
+        if (this.latch.getCount() != 0L) {
+            return false;
+        }
+        // are all the dependent results completed?
+        synchronized (dependentResults) {

Review Comment:
   @shashankhs11 : This is a bit confusing. I am suggesting multiple changes.
   1. ProduceRequestResult.completed() currently is only called by 
FutureRecordMetadata.isDone(). Since FutureRecordMetadata doesn't need to wait 
for all splitted ProduceRequestResults to complete, we want 
ProduceRequestResult.completed() to stay as it is and only depend on the result 
before the split.
   2. Introduce a new method like ProduceRequestResult.awaitAllDependants() and 
change flush() to call that.
   3. Write a unit test that makes sure 
ProduceRequestResult.awaitAllDependants() waits until all dependents complete.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to