[ https://issues.apache.org/jira/browse/FLINK-33692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17800985#comment-17800985 ]
Martijn Visser commented on FLINK-33692: ---------------------------------------- [~tzulitai] Thoughts on this? > FlinkKafkaProducer could miss super.close > ----------------------------------------- > > Key: FLINK-33692 > URL: https://issues.apache.org/jira/browse/FLINK-33692 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.13.6, 1.17.1 > Reporter: zhuming > Priority: Major > Original Estimate: 96h > Remaining Estimate: 96h > > > When flink job restarted 、canceled or failed. It will execute close method. > But in following FlinkKafkaProducer source code. If flush or close throw > InterruptedExecption. super.close method must be missed. > {code:java} > @Override > public void close() throws FlinkKafkaException { > // First close the producer for current transaction. > try { > final KafkaTransactionState currentTransaction = currentTransaction(); > if (currentTransaction != null) { > // to avoid exceptions on aborting transactions with some pending > records > flush(currentTransaction); > // normal abort for AT_LEAST_ONCE and NONE do not clean up > resources because of > // producer reusing, thus > // we need to close it manually > switch (semantic) { > case EXACTLY_ONCE: > break; > case AT_LEAST_ONCE: > case NONE: > currentTransaction.producer.flush(); > currentTransaction.producer.close(Duration.ofSeconds(0)); > break; > } > } > // If flush() or close() wasinterrupted, super.close might be missed. > super.close(); > } catch (Exception e) { > asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException); > } finally { > // We may have to close producer of the current transaction in case > some exception was > // thrown before > // the normal close routine finishes. > if (currentTransaction() != null) { > try { > currentTransaction().producer.close(Duration.ofSeconds(0)); > } catch (Throwable t) { > LOG.warn("Error closing producer.", t); > } > } > // Make sure all the producers for pending transactions are closed. > pendingTransactions() > .forEach( > transaction -> { > try { > > transaction.getValue().producer.close(Duration.ofSeconds(0)); > } catch (Throwable t) { > LOG.warn("Error closing producer.", t); > } > }); > // make sure we propagate pending errors > checkErroneous(); > } > } {code} > super.close() method is to execute following code. It ensures > '{*}KafkaTransactionState{*}' released correctlly. > {code:java} > @Override > public void close() throws Exception { > super.close(); > if (currentTransactionHolder != null) { > abort(currentTransactionHolder.handle); > currentTransactionHolder = null; > } > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)