g1geordie opened a new pull request #9707:
URL: https://github.com/apache/kafka/pull/9707


   KafkaProducer.flush method in callback will cause deadlock .
   
   because flush method  wait the future complete
   ``` java
   //  KafkaProducer
   public void flush() {
      .....
           try {
               this.accumulator.awaitFlushCompletion();
           } catch (InterruptedException e) {
               throw new InterruptException("Flush interrupted.", e);
           }
   }
   
   //RecordAccumulator
       public void awaitFlushCompletion() throws InterruptedException {
           try {
               for (ProducerBatch batch : this.incomplete.copyAll())
                   batch.produceFuture.await();
           } finally {
               this.flushesInProgress.decrementAndGet();
           }
       }
   ```
   
   but  future complete after the callback .
   ```java
   //ProducerBatch
    private void completeFutureAndFireCallbacks(long baseOffset, long 
logAppendTime, RuntimeException exception) {
        ...
           // execute callbacks
           for (Thunk thunk : thunks) {
               try {
                   if (exception == null) {
                       RecordMetadata metadata = thunk.future.value();
                       if (thunk.callback != null)
                           thunk.callback.onCompletion(metadata, null);
                   } else {
                       if (thunk.callback != null)
                           thunk.callback.onCompletion(null, exception);
                   }
               } catch (Exception e) {
                   log.error("Error executing user-provided callback on message 
for topic-partition '{}'", topicPartition, e);
               }
           }
   
           produceFuture.done();
    }
   ```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to