zhuming created FLINK-33692: ------------------------------- Summary: FlinkKafkaProducer could miss super.close Key: FLINK-33692 URL: https://issues.apache.org/jira/browse/FLINK-33692 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.17.1, 1.13.6 Reporter: zhuming
When flink job restarted 、canceled or failed. It will execute close method. But in following FlinkKafkaProducer source code. If flush or close throw InterruptedExecption. super.close method must be missed. {code:java} @Override public void close() throws FlinkKafkaException { // First close the producer for current transaction. try { final KafkaTransactionState currentTransaction = currentTransaction(); if (currentTransaction != null) { // to avoid exceptions on aborting transactions with some pending records flush(currentTransaction); // normal abort for AT_LEAST_ONCE and NONE do not clean up resources because of // producer reusing, thus // we need to close it manually switch (semantic) { case EXACTLY_ONCE: break; case AT_LEAST_ONCE: case NONE: currentTransaction.producer.flush(); currentTransaction.producer.close(Duration.ofSeconds(0)); break; } } // If flush() or close() wasinterrupted, super.close might be missed. super.close(); } catch (Exception e) { asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException); } finally { // We may have to close producer of the current transaction in case some exception was // thrown before // the normal close routine finishes. if (currentTransaction() != null) { try { currentTransaction().producer.close(Duration.ofSeconds(0)); } catch (Throwable t) { LOG.warn("Error closing producer.", t); } } // Make sure all the producers for pending transactions are closed. pendingTransactions() .forEach( transaction -> { try { transaction.getValue().producer.close(Duration.ofSeconds(0)); } catch (Throwable t) { LOG.warn("Error closing producer.", t); } }); // make sure we propagate pending errors checkErroneous(); } } {code} super.close() method is to execute following code. It ensures '{*}KafkaTransactionState{*}' released correctlly. {code:java} @Override public void close() throws Exception { super.close(); if (currentTransactionHolder != null) { abort(currentTransactionHolder.handle); currentTransactionHolder = null; } } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)