AndrewJSchofield commented on code in PR #17946:
URL: https://github.com/apache/kafka/pull/17946#discussion_r1858977786
##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1223,6 +1223,11 @@ private void ensureValidRecordSize(int size) {
*/
@Override
public void flush() {
+ if (Thread.currentThread() == this.ioThread) {
+ log.error("flush invocation detected in the callback. This can
cause a deadlock due to thread blocking.");
Review Comment:
I would change the wording here because actually the code is preventing the
call and that means there is no risk of a deadlock. Something like
"KafkaProducer.flush() invocation detected inside callback. This is not
permitted because of the risk of deadlock."
The message in the exception could be the same. The choice of KafkaException
seems most appropriate to me.
##########
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##########
@@ -2154,6 +2154,32 @@ public void testCallbackAndInterceptorHandleError() {
}
}
+ @Test
+ public void shouldNotInvokeFlushInCallback() {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+ configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
Review Comment:
Is this setting relevant? Having it in this test makes me think that the
test would only work with it, but I do not believe that to be the case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]