[jira] [Comment Edited] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2020-07-16 Thread freezhan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17159108#comment-17159108
 ] 

freezhan edited comment on FLINK-11654 at 7/16/20, 10:48 AM:
-

[~becket_qin] 

Are you still working on this question?

What's next?

 

I agree with the result of the discussion. 
 # Expose {{transactionIdPrefix ({color:#FF}And add length checks{color})}}
 # Fail Fast when overriding Kafka properties

 

I have a similar problem

https://issues.apache.org/jira/browse/FLINK-17691

[https://github.com/apache/flink/pull/12157]

 

If you don't think about continuing to fix it, then I want to take over.


was (Author: freezhan):
[~becket_qin] 

Are you still working on this question?

What's next?

 

I agree with the result of the discussion. 
 # Expose {{transactionIdPrefix}}
 # Fail Fast when overriding Kafka properties

 

I have a similar problem

https://issues.apache.org/jira/browse/FLINK-17691

[https://github.com/apache/flink/pull/12157]

 

If you don't think about continuing to fix it, then I want to take over.

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Assignee: Jiangjie Qin
>Priority: Major
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2019-07-12 Thread Jiangjie Qin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16884267#comment-16884267
 ] 

Jiangjie Qin edited comment on FLINK-11654 at 7/13/19 2:04 AM:
---

[~knaufk] That makes sense. In fact the {{transactional.id}} config is already 
supported by Kafka producers, we can simply use add that to the current prefix. 
This config can be set via {{producerProperties}} config. There are still 1 or 
2 constructors of the {{FlinkKafkaProducer}} that do not take 
{{producerProperties}}. We can keep the behavior as is in that case.

If we do that, the behavior would be:
 # When user did not provide {{producerProperties}} or did not specify 
{{transactional.id}} in the {{producerProperties}}, the behavior will be the 
same as current behavior.
 # When user provided a {{transactional.id}} in the {{producerProperties}}, 
that {{transactional.id}} will be part of the prefix. If the user provided 
{{transactional.id}} config is unique, the eventually used transactional.id 
will also be unique.

I think this is a good approach. It is fully backwards compatible and does not 
introduce any new API.


was (Author: becket_qin):
[~knaufk] That makes sense. In fact the {{transactional.id}} config is already 
supported by Kafka producers, we can simply use add that to the current prefix. 
This config can be set via {{producerProperties}} config. There are still 1 or 
2 constructors of the {{FlinkKafkaProducer}} that do not take 
{{producerProperties}}. We can keep the behavior as is in that case.

If we do that, the behavior would be:
 # When user did not provide \{{producerProperties}} or did not specify 
\{{transactional.id}} in the \{{producerProperties}}, the behavior will be the 
same as current behavior.
 # When user provided a {{transactional.id}} in the {{producerProperties}}, 
that \{{transactional.id}} will be part of the prefix. If the user provided 
{{transactional.id}} config is unique, the eventually used transactional.id 
will also be unique.

I think this is a good approach. It is fully backwards compatible and do not 
introduce any new API.

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
> Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2019-07-12 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16883584#comment-16883584
 ] 

Piotr Nowojski edited comment on FLINK-11654 at 7/12/19 7:15 AM:
-

I'm not sure. Making stability and/or uniqueness of {{JobName}} part of 
implicit {{FlinkKafkaProducer}} contract, might lead to some weird problems and 
"gotchas" - someone/something changes to {{JobName}} and suddenly 
{{FlinkKafkaProducer}} brakes. 

Setting an operator UID is on the one hand explicit, has also very similar 
semantic with respect to state changes and setting operator UID is also a very 
good idea on it's own (state compatibility).


was (Author: pnowojski):
I'm not sure. Making stability and/or uniqueness of {{JobName}} part of 
implicit {{FlinkKafkaProducer}} contract, might lead to some weird problems and 
"gotchas" - someone/something changes to {{JobName}} and suddenly 
{{FlinkKafkaProducer}} brakes. 

Setting an operator UID is on the one hand explicit, has also very similar 
semantic with respect to state changes and setting operator UID is also a very 
good idea for other reasons.

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
> Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2019-07-12 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16883584#comment-16883584
 ] 

Piotr Nowojski edited comment on FLINK-11654 at 7/12/19 7:14 AM:
-

I'm not sure. Making stability and/or uniqueness of {{JobName}} part of 
implicit {{FlinkKafkaProducer}} contract, might lead to some weird problems and 
"gotchas" - someone/something changes to {{JobName}} and suddenly 
{{FlinkKafkaProducer}} brakes. 

Setting an operator UID is on the one hand explicit, has also very similar 
semantic with respect to state changes and setting operator UID is also a very 
good idea for other reasons.


was (Author: pnowojski):
I'm not sure. Making stability and/or uniqueness of {{JobName}} part of 
implicit {{FlinkKafkaProducer}} contract, might lead to some weird problems and 
"gotchas" - someone/something changes to {{JobName}} and suddenly 
{{FlinkKafkaProducer}} brakes. 

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
> Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2019-03-08 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788251#comment-16788251
 ] 

Dawid Wysakowicz edited comment on FLINK-11654 at 3/8/19 8:04 PM:
--

Adding to making this setting mandatory, we recently introduced (in 1.9) a flag 
that forces to set uids for all operators in a job. See [FLINK-11653]


was (Author: dawidwys):
Adding to making this setting mandatory, we recently introduced (in 1.8) a flag 
that forces to set uids for all operators in a job. See [FLINK-11653]

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
> Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2019-03-08 Thread Chris Slotterback (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788247#comment-16788247
 ] 

Chris Slotterback edited comment on FLINK-11654 at 3/8/19 7:59 PM:
---

Setting the {{UID}} does work in my testing to avoid transactionId collisions, 
as long as it remains unique across *every* producer writing to that topic. In 
a multi-datacenter configuration, I had collisions occurring across multiple 
flink clusters.

I do think that if setting the {{UID}} operator (or a seed) is the the accepted 
workaround, it could certainly be more obvious or even set as required for the 
users to configure to use {{Semantics.EXACTLY_ONCE}} producers


was (Author: cslotterback):
I do think that if setting the {{UID}} operator (or a seed) is the the accepted 
workaround, it could certainly be more obvious or even set as required for the 
users to configure to use {{Semantics.EXACTLY_ONCE}} producers

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
> Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2019-03-08 Thread Tim (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788239#comment-16788239
 ] 

Tim edited comment on FLINK-11654 at 3/8/19 7:40 PM:
-

It is not clear to me what the _recommended_ workaround is.   Is it to name the 
task so that different jobs will end up with different transaction IDs?

FWIW - I think the TransactionIdSeed is a good idea in that it makes it clear 
to the user that a token is used to uniquely identify the pool of 
KafkaProducers.   I have not thought about if and how that would impact 
recovery and savepoints though.

Also, adding my findings as well.   
http://mail-archives.apache.org/mod_mbox/flink-user/201903.mbox/%3C9E3F033F-0CB0-4659-A487-39BB728C4F01%40comcast.com%3E


was (Author: victtim):
It is not clear to me what the _recommended_ workaround is.   Is it to name the 
task so that different jobs will end up with different transaction IDs?

FWIW - I think the TransactionIdSeed is a good idea in that it makes it clear 
to the user that a token is used to uniquely identify the pool of 
KafkaProducers.   I have not thought about if and how that would impact 
recovery and savepoints though.

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
> Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2019-02-28 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16780691#comment-16780691
 ] 

Piotr Nowojski edited comment on FLINK-11654 at 2/28/19 4:16 PM:
-

Adding a new unique configuration like {{TransactionIdSeed}} would increase 
code complexity and I do not see any benefit for a user over setting operator 
{{UID}}. Apart of the fact, that you can not change {{UID}} in a retrospection 
without stop with savepoint (otherwise you would brake exactly once). So I 
would be in favour of closing this issue and make the "workaround" official way 
to handle this use case.


was (Author: pnowojski):
Adding a new unique configuration like {{TransactionIdSeed}} would increase 
code complexity and I do not see any benefit for the user over setting operator 
{{UID}} apart of the fact, that you can not change {{UID}} in a retrospection 
without stop with savepoint (otherwise you would brake exactly once). So I 
would be in favour of closing this issue and make the "workaround" official way 
to handle this use case.

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
> Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)