[jira] [Commented] (FLINK-16478) add restApi to modify loglevel

2022-01-16 Thread Wenhao Ji (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17476775#comment-17476775
 ] 

Wenhao Ji commented on FLINK-16478:
---

As discussed in the 
[thread|https://lists.apache.org/thread/n8omkpjf1mk9jphx38b8tfrs4h3nxo3z], this 
feature won't be implemented. [~trohrmann], please help me close this ticket.

> add restApi to modify loglevel 
> ---
>
> Key: FLINK-16478
> URL: https://issues.apache.org/jira/browse/FLINK-16478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: xiaodao
>Assignee: Wenhao Ji
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> sometimes we may need to change loglevel to get more information to resolved 
> bug, now we need to stop it and modify conf/log4j.properties and resubmit it 
> ,i think it's better to add rest api to modify loglevel.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-16478) add restApi to modify loglevel

2021-12-20 Thread Wenhao Ji (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17463024#comment-17463024
 ] 

Wenhao Ji commented on FLINK-16478:
---

Thanks, [~trohrmann] !

> add restApi to modify loglevel 
> ---
>
> Key: FLINK-16478
> URL: https://issues.apache.org/jira/browse/FLINK-16478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: xiaodao
>Assignee: Wenhao Ji
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> sometimes we may need to change loglevel to get more information to resolved 
> bug, now we need to stop it and modify conf/log4j.properties and resubmit it 
> ,i think it's better to add rest api to modify loglevel.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-16478) add restApi to modify loglevel

2021-12-20 Thread Wenhao Ji (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17462629#comment-17462629
 ] 

Wenhao Ji commented on FLINK-16478:
---

Hi, [~trohrmann]. I'm interested in implementing this feature. Could you please 
kindly assign me this ticket if no one is working on this? I will draft a FLIP 
according to [~dixingx...@yeah.net]'s design and open a thread on the dev 
mailing list to discuss this feature further.

> add restApi to modify loglevel 
> ---
>
> Key: FLINK-16478
> URL: https://issues.apache.org/jira/browse/FLINK-16478
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: xiaodao
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> sometimes we may need to change loglevel to get more information to resolved 
> bug, now we need to stop it and modify conf/log4j.properties and resubmit it 
> ,i think it's better to add rest api to modify loglevel.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-23899) Translate the "Elastic Scaling" page into chinese

2021-08-20 Thread Wenhao Ji (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17402138#comment-17402138
 ] 

Wenhao Ji commented on FLINK-23899:
---

Hi, [~jark]. I would like to translate this doc. Would you please assign this 
ticket to me?

> Translate the "Elastic Scaling" page into chinese
> -
>
> Key: FLINK-23899
> URL: https://issues.apache.org/jira/browse/FLINK-23899
> Project: Flink
>  Issue Type: Improvement
>  Components: chinese-translation, Documentation, Project Website
>Affects Versions: 1.13.2
>Reporter: Wenhao Ji
>Priority: Minor
>
> Translate [the "Elastic Scaling" 
> page|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/elastic_scaling/]
>  into Chinese.
> The original documentation in English can be found at 
> docs/content/docs/deployment/elastic_scaling.md.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23899) Translate the "Elastic Scaling" page into chinese

2021-08-20 Thread Wenhao Ji (Jira)
Wenhao Ji created FLINK-23899:
-

 Summary: Translate the "Elastic Scaling" page into chinese
 Key: FLINK-23899
 URL: https://issues.apache.org/jira/browse/FLINK-23899
 Project: Flink
  Issue Type: Improvement
  Components: chinese-translation, Documentation, Project Website
Affects Versions: 1.13.2
Reporter: Wenhao Ji


Translate [the "Elastic Scaling" 
page|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/elastic_scaling/]
 into Chinese.

The original documentation in English can be found at 
docs/content/docs/deployment/elastic_scaling.md.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17104) Support registering custom JobStatusListeners from config

2021-08-17 Thread Wenhao Ji (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17400743#comment-17400743
 ] 

Wenhao Ji commented on FLINK-17104:
---

It has been a while since I opened the discussion about this feature. I hope 
everyone who is watching this ticket could participate in the [discussion on 
the mailing 
list|https://lists.apache.org/list.html?d...@flink.apache.org:2020-9:JobStatusListeners]
 . And I also have created a 
[POC|https://github.com/predatorray/flink/commit/2cab8bb1119162213632db984d2eb7529b8140e7]
 for this and hope you can share some suggestions about it.

> Support registering custom JobStatusListeners from config
> -
>
> Key: FLINK-17104
> URL: https://issues.apache.org/jira/browse/FLINK-17104
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Core
>Reporter: Canbin Zheng
>Priority: Minor
>  Labels: pull-request-available, stale-minor
>
> Currently, a variety of users are asking for registering custom 
> JobStatusListener support to get timely feedback on the status transition of 
> the jobs. This could be an important feature for effective Flink cluster 
> monitoring systems.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22452) Support specifying custom transactional.id prefix in FlinkKafkaProducer

2021-08-09 Thread Wenhao Ji (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17396468#comment-17396468
 ] 

Wenhao Ji commented on FLINK-22452:
---

[~nkruber-old], [~pnowojski], I opened the PR 
[#16746|https://github.com/apache/flink/pull/16746] according to your 
suggestions. Would you help me review the change?

> Support specifying custom transactional.id prefix in FlinkKafkaProducer
> ---
>
> Key: FLINK-22452
> URL: https://issues.apache.org/jira/browse/FLINK-22452
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.12.2
>Reporter: Wenhao Ji
>Assignee: Wenhao Ji
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Currently, the "transactional.id"s of the Kafka producers in 
> FlinkKafkaProducer are generated based on the task name. This mechanism has 
> some limitations:
>  * It will exceed Kafka's limitation if the task name is too long. (resolved 
> in FLINK-17691)
>  * They will very likely clash each other if the job topologies are similar. 
> (discussed in FLINK-11654)
>  * Only certain "transactional.id" may be authorized by [Prefixed 
> ACLs|https://docs.confluent.io/platform/current/kafka/authorization.html#prefixed-acls]
>  on the target Kafka cluster.
> Besides, the spring community has introduced the 
> [setTransactionIdPrefix|https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/DefaultKafkaProducerFactory.html#setTransactionIdPrefix(java.lang.String)]
>  method to their Kafka client.
> Therefore, I think it will be necessary to have this feature in the Flink 
> Kafka connector. 
>  
> As discussed in FLINK-11654, the possible solution will be,
>  * either introduce an additional method called 
> setTransactionalIdPrefix(String) in the FlinkKafkaProducer,
>  * or use the existing "transactional.id" properties as the prefix.
>  And the behavior of the "transactional.id" generation will be
>  * keep the behavior as it was if absent,
>  * use the one if present as the prefix for the TransactionalIdsGenerator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22452) Support specifying custom transactional.id prefix in FlinkKafkaProducer

2021-08-04 Thread Wenhao Ji (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392939#comment-17392939
 ] 

Wenhao Ji commented on FLINK-22452:
---

Shall we have a dedicated section, `transactional.id` for example, under the 
"Kafka Producer" following the "The SerializationSchema" section?

> Support specifying custom transactional.id prefix in FlinkKafkaProducer
> ---
>
> Key: FLINK-22452
> URL: https://issues.apache.org/jira/browse/FLINK-22452
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.12.2
>Reporter: Wenhao Ji
>Assignee: Wenhao Ji
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Currently, the "transactional.id"s of the Kafka producers in 
> FlinkKafkaProducer are generated based on the task name. This mechanism has 
> some limitations:
>  * It will exceed Kafka's limitation if the task name is too long. (resolved 
> in FLINK-17691)
>  * They will very likely clash each other if the job topologies are similar. 
> (discussed in FLINK-11654)
>  * Only certain "transactional.id" may be authorized by [Prefixed 
> ACLs|https://docs.confluent.io/platform/current/kafka/authorization.html#prefixed-acls]
>  on the target Kafka cluster.
> Besides, the spring community has introduced the 
> [setTransactionIdPrefix|https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/DefaultKafkaProducerFactory.html#setTransactionIdPrefix(java.lang.String)]
>  method to their Kafka client.
> Therefore, I think it will be necessary to have this feature in the Flink 
> Kafka connector. 
>  
> As discussed in FLINK-11654, the possible solution will be,
>  * either introduce an additional method called 
> setTransactionalIdPrefix(String) in the FlinkKafkaProducer,
>  * or use the existing "transactional.id" properties as the prefix.
>  And the behavior of the "transactional.id" generation will be
>  * keep the behavior as it was if absent,
>  * use the one if present as the prefix for the TransactionalIdsGenerator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22452) Support specifying custom transactional.id prefix in FlinkKafkaProducer

2021-08-04 Thread Wenhao Ji (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392933#comment-17392933
 ] 

Wenhao Ji commented on FLINK-22452:
---

Sure! Sorry for missing that. Let me update the documentation.

> Support specifying custom transactional.id prefix in FlinkKafkaProducer
> ---
>
> Key: FLINK-22452
> URL: https://issues.apache.org/jira/browse/FLINK-22452
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.12.2
>Reporter: Wenhao Ji
>Assignee: Wenhao Ji
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Currently, the "transactional.id"s of the Kafka producers in 
> FlinkKafkaProducer are generated based on the task name. This mechanism has 
> some limitations:
>  * It will exceed Kafka's limitation if the task name is too long. (resolved 
> in FLINK-17691)
>  * They will very likely clash each other if the job topologies are similar. 
> (discussed in FLINK-11654)
>  * Only certain "transactional.id" may be authorized by [Prefixed 
> ACLs|https://docs.confluent.io/platform/current/kafka/authorization.html#prefixed-acls]
>  on the target Kafka cluster.
> Besides, the spring community has introduced the 
> [setTransactionIdPrefix|https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/DefaultKafkaProducerFactory.html#setTransactionIdPrefix(java.lang.String)]
>  method to their Kafka client.
> Therefore, I think it will be necessary to have this feature in the Flink 
> Kafka connector. 
>  
> As discussed in FLINK-11654, the possible solution will be,
>  * either introduce an additional method called 
> setTransactionalIdPrefix(String) in the FlinkKafkaProducer,
>  * or use the existing "transactional.id" properties as the prefix.
>  And the behavior of the "transactional.id" generation will be
>  * keep the behavior as it was if absent,
>  * use the one if present as the prefix for the TransactionalIdsGenerator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2021-07-04 Thread Wenhao Ji (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17374266#comment-17374266
 ] 

Wenhao Ji commented on FLINK-11654:
---

Hi everyone. I would like to share [the link to the 
vote|https://lists.apache.org/thread.html/r5c69f2f8467637290b3607fdbb8e7e2b59be54705e3d22ec5d123683%40%3Cdev.flink.apache.org%3E]
 of 
[FLIP-172|https://cwiki.apache.org/confluence/display/FLINK/FLIP-172%3A+Support+custom+transactional.id+prefix+in+FlinkKafkaProducer]
 here, which aims to support the custom transactional.id prefix to solve this 
issue. Hope you guys participate in the vote and the 
[discussion|https://lists.apache.org/thread.html/r67610aa2d4dfdaf3b027b82edd1a3f46771f0d58902a4258d931e5a5%40%3Cdev.flink.apache.org%3E]!

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
>  Labels: auto-unassigned, stale-major
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22452) Support specifying custom transactional.id prefix in FlinkKafkaProducer

2021-07-04 Thread Wenhao Ji (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17374265#comment-17374265
 ] 

Wenhao Ji commented on FLINK-22452:
---

Hi, everyone. I opened a [vote 
thread|https://lists.apache.org/thread.html/r5c69f2f8467637290b3607fdbb8e7e2b59be54705e3d22ec5d123683%40%3Cdev.flink.apache.org%3E]
 for this feature and hope you could participate in the vote. If you have any 
question or concern, please feel to join us in this [discussion 
thread|https://lists.apache.org/thread.html/r67610aa2d4dfdaf3b027b82edd1a3f46771f0d58902a4258d931e5a5%40%3Cdev.flink.apache.org%3E]
 and do not hesitate to share your ideas.

> Support specifying custom transactional.id prefix in FlinkKafkaProducer
> ---
>
> Key: FLINK-22452
> URL: https://issues.apache.org/jira/browse/FLINK-22452
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.12.2
>Reporter: Wenhao Ji
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Currently, the "transactional.id"s of the Kafka producers in 
> FlinkKafkaProducer are generated based on the task name. This mechanism has 
> some limitations:
>  * It will exceed Kafka's limitation if the task name is too long. (resolved 
> in FLINK-17691)
>  * They will very likely clash each other if the job topologies are similar. 
> (discussed in FLINK-11654)
>  * Only certain "transactional.id" may be authorized by [Prefixed 
> ACLs|https://docs.confluent.io/platform/current/kafka/authorization.html#prefixed-acls]
>  on the target Kafka cluster.
> Besides, the spring community has introduced the 
> [setTransactionIdPrefix|https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/DefaultKafkaProducerFactory.html#setTransactionIdPrefix(java.lang.String)]
>  method to their Kafka client.
> Therefore, I think it will be necessary to have this feature in the Flink 
> Kafka connector. 
>  
> As discussed in FLINK-11654, the possible solution will be,
>  * either introduce an additional method called 
> setTransactionalIdPrefix(String) in the FlinkKafkaProducer,
>  * or use the existing "transactional.id" properties as the prefix.
>  And the behavior of the "transactional.id" generation will be
>  * keep the behavior as it was if absent,
>  * use the one if present as the prefix for the TransactionalIdsGenerator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22452) Support specifying custom transactional.id prefix in FlinkKafkaProducer

2021-06-17 Thread Wenhao Ji (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364798#comment-17364798
 ] 

Wenhao Ji commented on FLINK-22452:
---

Hi, [~mdemierre]. I've just created the FLIP for this. Let us discuss this 
feature in this thread: 
[http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-172-Support-custom-transactional-id-prefix-in-FlinkKafkaProducer-td51355.html]

> Support specifying custom transactional.id prefix in FlinkKafkaProducer
> ---
>
> Key: FLINK-22452
> URL: https://issues.apache.org/jira/browse/FLINK-22452
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.12.2
>Reporter: Wenhao Ji
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Currently, the "transactional.id"s of the Kafka producers in 
> FlinkKafkaProducer are generated based on the task name. This mechanism has 
> some limitations:
>  * It will exceed Kafka's limitation if the task name is too long. (resolved 
> in FLINK-17691)
>  * They will very likely clash each other if the job topologies are similar. 
> (discussed in FLINK-11654)
>  * Only certain "transactional.id" may be authorized by [Prefixed 
> ACLs|https://docs.confluent.io/platform/current/kafka/authorization.html#prefixed-acls]
>  on the target Kafka cluster.
> Besides, the spring community has introduced the 
> [setTransactionIdPrefix|https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/DefaultKafkaProducerFactory.html#setTransactionIdPrefix(java.lang.String)]
>  method to their Kafka client.
> Therefore, I think it will be necessary to have this feature in the Flink 
> Kafka connector. 
>  
> As discussed in FLINK-11654, the possible solution will be,
>  * either introduce an additional method called 
> setTransactionalIdPrefix(String) in the FlinkKafkaProducer,
>  * or use the existing "transactional.id" properties as the prefix.
>  And the behavior of the "transactional.id" generation will be
>  * keep the behavior as it was if absent,
>  * use the one if present as the prefix for the TransactionalIdsGenerator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2021-04-25 Thread Wenhao Ji (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17331465#comment-17331465
 ] 

Wenhao Ji commented on FLINK-11654:
---

[~becket_qin], sure thing! I have raised an improvement ticket. Lets discuss 
this proposal and move on the feature in 
[FLINK-22452|https://issues.apache.org/jira/browse/FLINK-22452]. 

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
>  Labels: stale-assigned
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22452) Support specifying custom transactional.id prefix in FlinkKafkaProducer

2021-04-25 Thread Wenhao Ji (Jira)
Wenhao Ji created FLINK-22452:
-

 Summary: Support specifying custom transactional.id prefix in 
FlinkKafkaProducer
 Key: FLINK-22452
 URL: https://issues.apache.org/jira/browse/FLINK-22452
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.12.2
Reporter: Wenhao Ji


Currently, the "transactional.id"s of the Kafka producers in FlinkKafkaProducer 
are generated based on the task name. This mechanism has some limitations:
 * It will exceed Kafka's limitation if the task name is too long. (resolved in 
FLINK-17691)
 * They will very likely clash each other if the job topologies are similar. 
(discussed in FLINK-11654)
 * Only certain "transactional.id" may be authorized by [Prefixed 
ACLs|https://docs.confluent.io/platform/current/kafka/authorization.html#prefixed-acls]
 on the target Kafka cluster.

Besides, the spring community has introduced the 
[setTransactionIdPrefix|https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/DefaultKafkaProducerFactory.html#setTransactionIdPrefix(java.lang.String)]
 method to their Kafka client.

Therefore, I think it will be necessary to have this feature in the Flink Kafka 
connector. 

 

As discussed in FLINK-11654, the possible solution will be,
 * either introduce an additional method called 
setTransactionalIdPrefix(String) in the FlinkKafkaProducer,
 * or use the existing "transactional.id" properties as the prefix.

 And the behavior of the "transactional.id" generation will be
 * keep the behavior as it was if absent,
 * use the one if present as the prefix for the TransactionalIdsGenerator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2021-04-12 Thread Wenhao Ji (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17319212#comment-17319212
 ] 

Wenhao Ji commented on FLINK-11654:
---

Hi, [~aljoscha] and [~freezhan]. Do we have any update on this issue? cmiiw, 
since we have come to the conclusion that transactionIdPrefix is going to be 
exposed in FlinkKafkaProducer, i am wondering if i can file a dedicated ticket 
for this improvement to track this feature. Actually this feature is quite 
important to us, since the kafka producers managed by our platform are only 
authorized to use certain transactional.id prefix to avoid conflict, just much 
like what [~knaufk] said. Otherwise it might be pretty hard for these producers 
to be used on Flink. Btw, if only exposing the transactionIdPrefix, do we still 
need to have a FLIP?

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Assignee: freezhan
>Priority: Major
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)