[ 
https://issues.apache.org/jira/browse/FLINK-33692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17800985#comment-17800985
 ] 

Martijn Visser commented on FLINK-33692:
----------------------------------------

[~tzulitai] Thoughts on this?

> FlinkKafkaProducer could miss super.close
> -----------------------------------------
>
>                 Key: FLINK-33692
>                 URL: https://issues.apache.org/jira/browse/FLINK-33692
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.13.6, 1.17.1
>            Reporter: zhuming
>            Priority: Major
>   Original Estimate: 96h
>  Remaining Estimate: 96h
>
>  
> When flink job restarted 、canceled or failed. It will execute close method. 
> But in following FlinkKafkaProducer source code. If flush or close throw 
> InterruptedExecption. super.close method must be missed.
> {code:java}
> @Override
> public void close() throws FlinkKafkaException {
>     // First close the producer for current transaction.
>     try {
>         final KafkaTransactionState currentTransaction = currentTransaction();
>         if (currentTransaction != null) {
>             // to avoid exceptions on aborting transactions with some pending 
> records
>             flush(currentTransaction);
>             // normal abort for AT_LEAST_ONCE and NONE do not clean up 
> resources because of
>             // producer reusing, thus
>             // we need to close it manually
>             switch (semantic) {
>                 case EXACTLY_ONCE:
>                     break;
>                 case AT_LEAST_ONCE:
>                 case NONE:
>                     currentTransaction.producer.flush();
>                     currentTransaction.producer.close(Duration.ofSeconds(0));
>                     break;
>             }
>         }
>         // If flush() or close() wasinterrupted, super.close might be missed.
>         super.close();
>     } catch (Exception e) {
>         asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
>     } finally {
>         // We may have to close producer of the current transaction in case 
> some exception was
>         // thrown before
>         // the normal close routine finishes.
>         if (currentTransaction() != null) {
>             try {
>                 currentTransaction().producer.close(Duration.ofSeconds(0));
>             } catch (Throwable t) {
>                 LOG.warn("Error closing producer.", t);
>             }
>         }
>         // Make sure all the producers for pending transactions are closed.
>         pendingTransactions()
>                 .forEach(
>                         transaction -> {
>                             try {
>                                 
> transaction.getValue().producer.close(Duration.ofSeconds(0));
>                             } catch (Throwable t) {
>                                 LOG.warn("Error closing producer.", t);
>                             }
>                         });
>         // make sure we propagate pending errors
>         checkErroneous();
>     }
> } {code}
> super.close() method is to execute following code.  It ensures  
> '{*}KafkaTransactionState{*}'  released correctlly. 
> {code:java}
> @Override
> public void close() throws Exception {
>     super.close();
>     if (currentTransactionHolder != null) {
>         abort(currentTransactionHolder.handle);
>         currentTransactionHolder = null;
>     }
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to