junrao commented on code in PR #20285:
URL: https://github.com/apache/kafka/pull/20285#discussion_r2418163965
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java:
##########
@@ -129,6 +148,17 @@ public TopicPartition topicPartition() {
* Has the request completed?
*/
public boolean completed() {
- return this.latch.getCount() == 0L;
+ if (this.latch.getCount() != 0L) {
+ return false;
+ }
+ // are all the dependent results completed?
+ synchronized (dependentResults) {
Review Comment:
Hmm, this is not quite right. This method is called from
FutureRecordMetadata. If a batch is split into two batches b1 and b2. For
records in b1, they don't need to wait for the completion for b2. So, it
doesn't seem that we should change this method. But it will be useful to
document the semantic that it doesn't wait for dependent requests.
```
@Override
public boolean isDone() {
if (nextRecordMetadata != null)
return nextRecordMetadata.isDone();
return this.result.completed();
}
```
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java:
##########
@@ -70,7 +73,23 @@ public void done() {
}
/**
- * Await the completion of this request
+ * Add a dependent ProduceRequestResult that must complete before this
result is considered complete.
+ * This is used when a batch is split into multiple batches - the original
batch's result
+ * should not complete until all split batches have completed.
+ *
+ * @param dependentResult The dependent result to wait for
+ */
+ public void addDependentResult(ProduceRequestResult dependentResult) {
+ synchronized (dependentResults) {
+ dependentResults.add(dependentResult);
+ }
+ }
+
+ /**
+ * Await the completion of this request.
+ * Note: This only waits for the local latch and not dependent results.
Review Comment:
flush() actually calls await() and it needs to wait for all dependent
requests to complete. Perhaps we could add a separate method like
awaitAllDependants() and use that in flush().
```
public void awaitFlushCompletion() throws InterruptedException {
try {
// Obtain a copy of all of the incomplete
ProduceRequestResult(s) at the time of the flush.
// We must be careful not to hold a reference to the
ProduceBatch(s) so that garbage
// collection can occur on the contents.
// The sender will remove ProducerBatch(s) from the original
incomplete collection.
for (ProduceRequestResult result :
this.incomplete.requestResults())
result.await();
} finally {
this.flushesInProgress.decrementAndGet();
}
}
```
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java:
##########
@@ -33,6 +35,7 @@ public class ProduceRequestResult {
private final CountDownLatch latch = new CountDownLatch(1);
private final TopicPartition topicPartition;
+ private final List<ProduceRequestResult> dependentResults = new
ArrayList<>();
Review Comment:
This is an existing issue. There is a typo "record set was sent was sent".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]