junrao commented on code in PR #19489:
URL: https://github.com/apache/kafka/pull/19489#discussion_r2593484432
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -354,7 +317,8 @@ private boolean
shouldHandleAuthorizationError(RuntimeException exception) {
return false;
}
- private long sendProducerData(long now) {
+ // Visible for testing
+ protected long sendProducerData(long now) {
Review Comment:
The bug could also manifest as a different record batch being sent to the
broker. In UnifiedLog.trimInvalidBytes(), we trim extra bytes from the record
batch. On the leader side, we don't expect the record batch to have extra
bytes. It would be useful to log a warning in this is not the case.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java:
##########
@@ -48,7 +48,7 @@ public class BufferPool {
private final long totalMemory;
private final int poolableSize;
- private final ReentrantLock lock;
+ protected final ReentrantLock lock;
Review Comment:
Is this change needed?
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -2533,10 +2534,7 @@ public void testInflightBatchesExpireOnDeliveryTimeout()
throws InterruptedExcep
time.sleep(deliveryTimeoutMs);
sender.runOnce(); // receive first response
assertEquals(0, sender.inFlightBatches(tp0).size(), "Expect zero
in-flight batch in accumulator");
- assertInstanceOf(
- TimeoutException.class,
- assertThrows(ExecutionException.class, request::get).getCause(),
- "The expired batch should throw a TimeoutException");
+ assertDoesNotThrow(() -> request.get());
Review Comment:
Instead of a new integration test, I am wondering if we could just extend
this test for coverage. We can create an inflight request, but not complete the
request. Let the delivery timeout to expire and check that the batch is not
completed.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -910,8 +874,11 @@ private void sendProduceRequest(long now, int destination,
short acks, int timeo
RequestCompletionHandler callback = response ->
handleProduceResponse(response, recordsByPartition, topicNames,
time.milliseconds());
String nodeId = Integer.toString(destination);
+
+ long deliveryTimeoutMs = accumulator.getDeliveryTimeoutMs() - (now -
earliestCreatedMs);
Review Comment:
Hmm, this is a bit more complicated than I thought since there are multiple
batches with different creation time in a request. For now, it's probably
better to just use requestTimeoutMs to make the fix simple. This could extend
the delivery timeout by request timeout in the rare cases. We can file a
followup jira to see if there are better solutions in the future.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -403,9 +367,7 @@ private long sendProducerData(long now) {
}
accumulator.resetNextBatchExpiryTime();
- List<ProducerBatch> expiredInflightBatches =
getExpiredInflightBatches(now);
List<ProducerBatch> expiredBatches =
this.accumulator.expiredBatches(now);
- expiredBatches.addAll(expiredInflightBatches);
Review Comment:
It would be useful to add a comment on why we are not expiring inflight
batches here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]