[ 
https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673336#comment-16673336
 ] 

ASF GitHub Bot commented on FLINK-10455:
----------------------------------------

pnowojski commented on issue #6989: [FLINK-10455][Kafka Tx] Close transactional 
producers in case of failure and termination
URL: https://github.com/apache/flink/pull/6989#issuecomment-435429850
 
 
   Regarding your question @azagrebin, I think my bug fix for 
https://issues.apache.org/jira/browse/FLINK-8086  (which added "ignore 
ProducerFencedException") indeed might turned the @GJL's timeouts code:
   ```
                } catch (final Exception e) {
                        final long elapsedTime = clock.millis() - 
transactionHolder.transactionStartTime;
                        if (ignoreFailuresAfterTransactionTimeout && 
elapsedTime > transactionTimeout) {
                                LOG.error("Error while committing transaction 
{}. " +
                                                "Transaction has been open for 
longer than the transaction timeout ({})." +
                                                "Commit will not be attempted 
again. Data loss might have occurred.",
                                        transactionHolder.handle, 
transactionTimeout, e);
                        } else {
                                throw e;
                        }
                }
   ```
   into a dead code at the moment. But I think my bug fix is more important 
(from my commit message):
   > [FLINK-8086][kafka] Ignore ProducerFencedException 
   >
   > during recovery  ProducerFencedException can happen if we restore twice 
from the same checkpoint or if we restore from an old savepoint. In both cases 
transactional.ids that we want to recoverAndCommit have been already committed 
and reused. Reusing mean that they will be known by Kafka's brokers under newer 
producerId/epochId, which will result in ProducerFencedException if we try to 
commit again some old (and already committed) transaction.  
   >
   > Ignoring this exception might hide some bugs/issues, because instead of 
failing we might have a semi silent (with a warning) data loss.
   
   I think the last sentence answers your question.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Potential Kafka producer leak in case of failures
> -------------------------------------------------
>
>                 Key: FLINK-10455
>                 URL: https://issues.apache.org/jira/browse/FLINK-10455
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.5.2
>            Reporter: Nico Kruber
>            Assignee: Andrey Zagrebin
>            Priority: Major
>              Labels: pull-request-available
>
> If the Kafka brokers' timeout is too low for our checkpoint interval [1], we 
> may get an {{ProducerFencedException}}. Documentation around 
> {{ProducerFencedException}} explicitly states that we should close the 
> producer after encountering it.
> By looking at the code, it doesn't seem like this is actually done in 
> {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in 
> {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an 
> exception, we don't clean up (nor try to commit) any other transaction.
> -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} 
> simply iterates over the {{pendingCommitTransactions}} which is not touched 
> during {{close()}}
> Now if we restart the failing job on the same Flink cluster, any resources 
> from the previous attempt will still linger around.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to