zhuming created FLINK-33692:
-------------------------------

             Summary: FlinkKafkaProducer could miss super.close
                 Key: FLINK-33692
                 URL: https://issues.apache.org/jira/browse/FLINK-33692
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 1.17.1, 1.13.6
            Reporter: zhuming


 

When flink job restarted 、canceled or failed. It will execute close method. But 
in following FlinkKafkaProducer source code. If flush or close throw 
InterruptedExecption. super.close method must be missed.
{code:java}
@Override
public void close() throws FlinkKafkaException {
    // First close the producer for current transaction.
    try {
        final KafkaTransactionState currentTransaction = currentTransaction();
        if (currentTransaction != null) {
            // to avoid exceptions on aborting transactions with some pending 
records
            flush(currentTransaction);

            // normal abort for AT_LEAST_ONCE and NONE do not clean up 
resources because of
            // producer reusing, thus
            // we need to close it manually
            switch (semantic) {
                case EXACTLY_ONCE:
                    break;
                case AT_LEAST_ONCE:
                case NONE:
                    currentTransaction.producer.flush();
                    currentTransaction.producer.close(Duration.ofSeconds(0));
                    break;
            }
        }
        // If flush() or close() wasinterrupted, super.close might be missed.
        super.close();
    } catch (Exception e) {
        asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
    } finally {
        // We may have to close producer of the current transaction in case 
some exception was
        // thrown before
        // the normal close routine finishes.
        if (currentTransaction() != null) {
            try {
                currentTransaction().producer.close(Duration.ofSeconds(0));
            } catch (Throwable t) {
                LOG.warn("Error closing producer.", t);
            }
        }
        // Make sure all the producers for pending transactions are closed.
        pendingTransactions()
                .forEach(
                        transaction -> {
                            try {
                                
transaction.getValue().producer.close(Duration.ofSeconds(0));
                            } catch (Throwable t) {
                                LOG.warn("Error closing producer.", t);
                            }
                        });
        // make sure we propagate pending errors
        checkErroneous();
    }
} {code}
super.close() method is to execute following code.  It ensures  
'{*}KafkaTransactionState{*}'  released correctlly. 
{code:java}
@Override
public void close() throws Exception {
    super.close();

    if (currentTransactionHolder != null) {
        abort(currentTransactionHolder.handle);
        currentTransactionHolder = null;
    }
} {code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to