[jira] [Comment Edited] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
[ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17159108#comment-17159108 ] freezhan edited comment on FLINK-11654 at 7/16/20, 10:48 AM: - [~becket_qin] Are you still working on this question? What's next? I agree with the result of the discussion. # Expose {{transactionIdPrefix ({color:#FF}And add length checks{color})}} # Fail Fast when overriding Kafka properties I have a similar problem https://issues.apache.org/jira/browse/FLINK-17691 [https://github.com/apache/flink/pull/12157] If you don't think about continuing to fix it, then I want to take over. was (Author: freezhan): [~becket_qin] Are you still working on this question? What's next? I agree with the result of the discussion. # Expose {{transactionIdPrefix}} # Fail Fast when overriding Kafka properties I have a similar problem https://issues.apache.org/jira/browse/FLINK-17691 [https://github.com/apache/flink/pull/12157] If you don't think about continuing to fix it, then I want to take over. > Multiple transactional KafkaProducers writing to same cluster have clashing > transaction IDs > --- > > Key: FLINK-11654 > URL: https://issues.apache.org/jira/browse/FLINK-11654 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Assignee: Jiangjie Qin >Priority: Major > > We run multiple jobs on a cluster which write a lot to the same Kafka topic > from identically named sinks. When EXACTLY_ONCE semantic is enabled for the > KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go > into a restart cycle. > Example exception from the Kafka log: > > {code:java} > [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing > append operation on partition finding-commands-dev-1-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 483 > (request epoch), 484 (server epoch) > {code} > The reason for this is the way FlinkKafkaProducer initializes the > TransactionalIdsGenerator: > The IDs are only guaranteed to be unique for a single Job. But they can clash > between different Jobs (and Clusters). > > > {code:java} > --- > a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > +++ > b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > @@ -819,6 +819,7 @@ public class FlinkKafkaProducer > nextTransactionalIdHintState = > context.getOperatorStateStore().getUnionListState( > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); > transactionalIdsGenerator = new TransactionalIdsGenerator( > + // the prefix probably should include job id and maybe cluster id > getRuntimeContext().getTaskName() + "-" + > ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), > getRuntimeContext().getIndexOfThisSubtask(), > > getRuntimeContext().getNumberOfParallelSubtasks(),{code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
[ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16884267#comment-16884267 ] Jiangjie Qin edited comment on FLINK-11654 at 7/13/19 2:04 AM: --- [~knaufk] That makes sense. In fact the {{transactional.id}} config is already supported by Kafka producers, we can simply use add that to the current prefix. This config can be set via {{producerProperties}} config. There are still 1 or 2 constructors of the {{FlinkKafkaProducer}} that do not take {{producerProperties}}. We can keep the behavior as is in that case. If we do that, the behavior would be: # When user did not provide {{producerProperties}} or did not specify {{transactional.id}} in the {{producerProperties}}, the behavior will be the same as current behavior. # When user provided a {{transactional.id}} in the {{producerProperties}}, that {{transactional.id}} will be part of the prefix. If the user provided {{transactional.id}} config is unique, the eventually used transactional.id will also be unique. I think this is a good approach. It is fully backwards compatible and does not introduce any new API. was (Author: becket_qin): [~knaufk] That makes sense. In fact the {{transactional.id}} config is already supported by Kafka producers, we can simply use add that to the current prefix. This config can be set via {{producerProperties}} config. There are still 1 or 2 constructors of the {{FlinkKafkaProducer}} that do not take {{producerProperties}}. We can keep the behavior as is in that case. If we do that, the behavior would be: # When user did not provide \{{producerProperties}} or did not specify \{{transactional.id}} in the \{{producerProperties}}, the behavior will be the same as current behavior. # When user provided a {{transactional.id}} in the {{producerProperties}}, that \{{transactional.id}} will be part of the prefix. If the user provided {{transactional.id}} config is unique, the eventually used transactional.id will also be unique. I think this is a good approach. It is fully backwards compatible and do not introduce any new API. > Multiple transactional KafkaProducers writing to same cluster have clashing > transaction IDs > --- > > Key: FLINK-11654 > URL: https://issues.apache.org/jira/browse/FLINK-11654 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Priority: Major > Fix For: 1.9.0 > > > We run multiple jobs on a cluster which write a lot to the same Kafka topic > from identically named sinks. When EXACTLY_ONCE semantic is enabled for the > KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go > into a restart cycle. > Example exception from the Kafka log: > > {code:java} > [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing > append operation on partition finding-commands-dev-1-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 483 > (request epoch), 484 (server epoch) > {code} > The reason for this is the way FlinkKafkaProducer initializes the > TransactionalIdsGenerator: > The IDs are only guaranteed to be unique for a single Job. But they can clash > between different Jobs (and Clusters). > > > {code:java} > --- > a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > +++ > b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > @@ -819,6 +819,7 @@ public class FlinkKafkaProducer > nextTransactionalIdHintState = > context.getOperatorStateStore().getUnionListState( > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); > transactionalIdsGenerator = new TransactionalIdsGenerator( > + // the prefix probably should include job id and maybe cluster id > getRuntimeContext().getTaskName() + "-" + > ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), > getRuntimeContext().getIndexOfThisSubtask(), > > getRuntimeContext().getNumberOfParallelSubtasks(),{code} > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
[ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16883584#comment-16883584 ] Piotr Nowojski edited comment on FLINK-11654 at 7/12/19 7:15 AM: - I'm not sure. Making stability and/or uniqueness of {{JobName}} part of implicit {{FlinkKafkaProducer}} contract, might lead to some weird problems and "gotchas" - someone/something changes to {{JobName}} and suddenly {{FlinkKafkaProducer}} brakes. Setting an operator UID is on the one hand explicit, has also very similar semantic with respect to state changes and setting operator UID is also a very good idea on it's own (state compatibility). was (Author: pnowojski): I'm not sure. Making stability and/or uniqueness of {{JobName}} part of implicit {{FlinkKafkaProducer}} contract, might lead to some weird problems and "gotchas" - someone/something changes to {{JobName}} and suddenly {{FlinkKafkaProducer}} brakes. Setting an operator UID is on the one hand explicit, has also very similar semantic with respect to state changes and setting operator UID is also a very good idea for other reasons. > Multiple transactional KafkaProducers writing to same cluster have clashing > transaction IDs > --- > > Key: FLINK-11654 > URL: https://issues.apache.org/jira/browse/FLINK-11654 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Priority: Major > Fix For: 1.9.0 > > > We run multiple jobs on a cluster which write a lot to the same Kafka topic > from identically named sinks. When EXACTLY_ONCE semantic is enabled for the > KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go > into a restart cycle. > Example exception from the Kafka log: > > {code:java} > [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing > append operation on partition finding-commands-dev-1-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 483 > (request epoch), 484 (server epoch) > {code} > The reason for this is the way FlinkKafkaProducer initializes the > TransactionalIdsGenerator: > The IDs are only guaranteed to be unique for a single Job. But they can clash > between different Jobs (and Clusters). > > > {code:java} > --- > a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > +++ > b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > @@ -819,6 +819,7 @@ public class FlinkKafkaProducer > nextTransactionalIdHintState = > context.getOperatorStateStore().getUnionListState( > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); > transactionalIdsGenerator = new TransactionalIdsGenerator( > + // the prefix probably should include job id and maybe cluster id > getRuntimeContext().getTaskName() + "-" + > ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), > getRuntimeContext().getIndexOfThisSubtask(), > > getRuntimeContext().getNumberOfParallelSubtasks(),{code} > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
[ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16883584#comment-16883584 ] Piotr Nowojski edited comment on FLINK-11654 at 7/12/19 7:14 AM: - I'm not sure. Making stability and/or uniqueness of {{JobName}} part of implicit {{FlinkKafkaProducer}} contract, might lead to some weird problems and "gotchas" - someone/something changes to {{JobName}} and suddenly {{FlinkKafkaProducer}} brakes. Setting an operator UID is on the one hand explicit, has also very similar semantic with respect to state changes and setting operator UID is also a very good idea for other reasons. was (Author: pnowojski): I'm not sure. Making stability and/or uniqueness of {{JobName}} part of implicit {{FlinkKafkaProducer}} contract, might lead to some weird problems and "gotchas" - someone/something changes to {{JobName}} and suddenly {{FlinkKafkaProducer}} brakes. > Multiple transactional KafkaProducers writing to same cluster have clashing > transaction IDs > --- > > Key: FLINK-11654 > URL: https://issues.apache.org/jira/browse/FLINK-11654 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Priority: Major > Fix For: 1.9.0 > > > We run multiple jobs on a cluster which write a lot to the same Kafka topic > from identically named sinks. When EXACTLY_ONCE semantic is enabled for the > KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go > into a restart cycle. > Example exception from the Kafka log: > > {code:java} > [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing > append operation on partition finding-commands-dev-1-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 483 > (request epoch), 484 (server epoch) > {code} > The reason for this is the way FlinkKafkaProducer initializes the > TransactionalIdsGenerator: > The IDs are only guaranteed to be unique for a single Job. But they can clash > between different Jobs (and Clusters). > > > {code:java} > --- > a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > +++ > b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > @@ -819,6 +819,7 @@ public class FlinkKafkaProducer > nextTransactionalIdHintState = > context.getOperatorStateStore().getUnionListState( > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); > transactionalIdsGenerator = new TransactionalIdsGenerator( > + // the prefix probably should include job id and maybe cluster id > getRuntimeContext().getTaskName() + "-" + > ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), > getRuntimeContext().getIndexOfThisSubtask(), > > getRuntimeContext().getNumberOfParallelSubtasks(),{code} > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
[ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788251#comment-16788251 ] Dawid Wysakowicz edited comment on FLINK-11654 at 3/8/19 8:04 PM: -- Adding to making this setting mandatory, we recently introduced (in 1.9) a flag that forces to set uids for all operators in a job. See [FLINK-11653] was (Author: dawidwys): Adding to making this setting mandatory, we recently introduced (in 1.8) a flag that forces to set uids for all operators in a job. See [FLINK-11653] > Multiple transactional KafkaProducers writing to same cluster have clashing > transaction IDs > --- > > Key: FLINK-11654 > URL: https://issues.apache.org/jira/browse/FLINK-11654 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Priority: Major > Fix For: 1.9.0 > > > We run multiple jobs on a cluster which write a lot to the same Kafka topic > from identically named sinks. When EXACTLY_ONCE semantic is enabled for the > KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go > into a restart cycle. > Example exception from the Kafka log: > > {code:java} > [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing > append operation on partition finding-commands-dev-1-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 483 > (request epoch), 484 (server epoch) > {code} > The reason for this is the way FlinkKafkaProducer initializes the > TransactionalIdsGenerator: > The IDs are only guaranteed to be unique for a single Job. But they can clash > between different Jobs (and Clusters). > > > {code:java} > --- > a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > +++ > b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > @@ -819,6 +819,7 @@ public class FlinkKafkaProducer > nextTransactionalIdHintState = > context.getOperatorStateStore().getUnionListState( > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); > transactionalIdsGenerator = new TransactionalIdsGenerator( > + // the prefix probably should include job id and maybe cluster id > getRuntimeContext().getTaskName() + "-" + > ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), > getRuntimeContext().getIndexOfThisSubtask(), > > getRuntimeContext().getNumberOfParallelSubtasks(),{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
[ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788247#comment-16788247 ] Chris Slotterback edited comment on FLINK-11654 at 3/8/19 7:59 PM: --- Setting the {{UID}} does work in my testing to avoid transactionId collisions, as long as it remains unique across *every* producer writing to that topic. In a multi-datacenter configuration, I had collisions occurring across multiple flink clusters. I do think that if setting the {{UID}} operator (or a seed) is the the accepted workaround, it could certainly be more obvious or even set as required for the users to configure to use {{Semantics.EXACTLY_ONCE}} producers was (Author: cslotterback): I do think that if setting the {{UID}} operator (or a seed) is the the accepted workaround, it could certainly be more obvious or even set as required for the users to configure to use {{Semantics.EXACTLY_ONCE}} producers > Multiple transactional KafkaProducers writing to same cluster have clashing > transaction IDs > --- > > Key: FLINK-11654 > URL: https://issues.apache.org/jira/browse/FLINK-11654 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Priority: Major > Fix For: 1.9.0 > > > We run multiple jobs on a cluster which write a lot to the same Kafka topic > from identically named sinks. When EXACTLY_ONCE semantic is enabled for the > KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go > into a restart cycle. > Example exception from the Kafka log: > > {code:java} > [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing > append operation on partition finding-commands-dev-1-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 483 > (request epoch), 484 (server epoch) > {code} > The reason for this is the way FlinkKafkaProducer initializes the > TransactionalIdsGenerator: > The IDs are only guaranteed to be unique for a single Job. But they can clash > between different Jobs (and Clusters). > > > {code:java} > --- > a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > +++ > b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > @@ -819,6 +819,7 @@ public class FlinkKafkaProducer > nextTransactionalIdHintState = > context.getOperatorStateStore().getUnionListState( > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); > transactionalIdsGenerator = new TransactionalIdsGenerator( > + // the prefix probably should include job id and maybe cluster id > getRuntimeContext().getTaskName() + "-" + > ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), > getRuntimeContext().getIndexOfThisSubtask(), > > getRuntimeContext().getNumberOfParallelSubtasks(),{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
[ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788239#comment-16788239 ] Tim edited comment on FLINK-11654 at 3/8/19 7:40 PM: - It is not clear to me what the _recommended_ workaround is. Is it to name the task so that different jobs will end up with different transaction IDs? FWIW - I think the TransactionIdSeed is a good idea in that it makes it clear to the user that a token is used to uniquely identify the pool of KafkaProducers. I have not thought about if and how that would impact recovery and savepoints though. Also, adding my findings as well. http://mail-archives.apache.org/mod_mbox/flink-user/201903.mbox/%3C9E3F033F-0CB0-4659-A487-39BB728C4F01%40comcast.com%3E was (Author: victtim): It is not clear to me what the _recommended_ workaround is. Is it to name the task so that different jobs will end up with different transaction IDs? FWIW - I think the TransactionIdSeed is a good idea in that it makes it clear to the user that a token is used to uniquely identify the pool of KafkaProducers. I have not thought about if and how that would impact recovery and savepoints though. > Multiple transactional KafkaProducers writing to same cluster have clashing > transaction IDs > --- > > Key: FLINK-11654 > URL: https://issues.apache.org/jira/browse/FLINK-11654 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Priority: Major > Fix For: 1.9.0 > > > We run multiple jobs on a cluster which write a lot to the same Kafka topic > from identically named sinks. When EXACTLY_ONCE semantic is enabled for the > KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go > into a restart cycle. > Example exception from the Kafka log: > > {code:java} > [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing > append operation on partition finding-commands-dev-1-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 483 > (request epoch), 484 (server epoch) > {code} > The reason for this is the way FlinkKafkaProducer initializes the > TransactionalIdsGenerator: > The IDs are only guaranteed to be unique for a single Job. But they can clash > between different Jobs (and Clusters). > > > {code:java} > --- > a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > +++ > b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > @@ -819,6 +819,7 @@ public class FlinkKafkaProducer > nextTransactionalIdHintState = > context.getOperatorStateStore().getUnionListState( > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); > transactionalIdsGenerator = new TransactionalIdsGenerator( > + // the prefix probably should include job id and maybe cluster id > getRuntimeContext().getTaskName() + "-" + > ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), > getRuntimeContext().getIndexOfThisSubtask(), > > getRuntimeContext().getNumberOfParallelSubtasks(),{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
[ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16780691#comment-16780691 ] Piotr Nowojski edited comment on FLINK-11654 at 2/28/19 4:16 PM: - Adding a new unique configuration like {{TransactionIdSeed}} would increase code complexity and I do not see any benefit for a user over setting operator {{UID}}. Apart of the fact, that you can not change {{UID}} in a retrospection without stop with savepoint (otherwise you would brake exactly once). So I would be in favour of closing this issue and make the "workaround" official way to handle this use case. was (Author: pnowojski): Adding a new unique configuration like {{TransactionIdSeed}} would increase code complexity and I do not see any benefit for the user over setting operator {{UID}} apart of the fact, that you can not change {{UID}} in a retrospection without stop with savepoint (otherwise you would brake exactly once). So I would be in favour of closing this issue and make the "workaround" official way to handle this use case. > Multiple transactional KafkaProducers writing to same cluster have clashing > transaction IDs > --- > > Key: FLINK-11654 > URL: https://issues.apache.org/jira/browse/FLINK-11654 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Priority: Major > Fix For: 1.9.0 > > > We run multiple jobs on a cluster which write a lot to the same Kafka topic > from identically named sinks. When EXACTLY_ONCE semantic is enabled for the > KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go > into a restart cycle. > Example exception from the Kafka log: > > {code:java} > [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing > append operation on partition finding-commands-dev-1-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 483 > (request epoch), 484 (server epoch) > {code} > The reason for this is the way FlinkKafkaProducer initializes the > TransactionalIdsGenerator: > The IDs are only guaranteed to be unique for a single Job. But they can clash > between different Jobs (and Clusters). > > > {code:java} > --- > a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > +++ > b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > @@ -819,6 +819,7 @@ public class FlinkKafkaProducer > nextTransactionalIdHintState = > context.getOperatorStateStore().getUnionListState( > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); > transactionalIdsGenerator = new TransactionalIdsGenerator( > + // the prefix probably should include job id and maybe cluster id > getRuntimeContext().getTaskName() + "-" + > ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), > getRuntimeContext().getIndexOfThisSubtask(), > > getRuntimeContext().getNumberOfParallelSubtasks(),{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)