[jira] [Commented] (FLINK-17691) FlinkKafkaProducer transactional.id too long when using Semantic.EXACTLY_ONCE

2021-05-21 Thread freezhan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17349586#comment-17349586
 ] 

freezhan commented on FLINK-17691:
--

[~nvolynets] https://issues.apache.org/jira/browse/FLINK-22452

> FlinkKafkaProducer transactional.id too long when using Semantic.EXACTLY_ONCE
> -
>
> Key: FLINK-17691
> URL: https://issues.apache.org/jira/browse/FLINK-17691
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0, 1.11.0
>Reporter: freezhan
>Assignee: John Mathews
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
> Attachments: image-2020-05-14-20-43-57-414.png, 
> image-2020-05-14-20-45-24-030.png, image-2020-05-14-20-45-59-878.png, 
> image-2020-05-14-21-09-01-906.png, image-2020-05-14-21-16-43-810.png, 
> image-2020-05-14-21-17-09-784.png
>
>
> When sink to Kafka using the {color:#FF}Semantic.EXACTLY_ONCE {color}mode.
> The flink Kafka Connector Producer will auto set the 
> {color:#FF}transactional.id{color}, and the user - defined value are 
> ignored.
>  
> When the job operator name too long, will send failed
> transactional.id is exceeds the kafka  {color:#FF}coordinator_key{color} 
> limit
> !image-2020-05-14-21-09-01-906.png!
>  
> *The flink Kafka Connector policy for automatic generation of transaction.id 
> is as follows*
>  
> 1. use the {color:#FF}taskName + "-" + operatorUniqueID{color} as 
> transactional.id prefix (may be too long)
>   getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext)    
> getRuntimeContext()).getOperatorUniqueID()
> 2. Range of available transactional ids 
> [nextFreeTransactionalId, nextFreeTransactionalId + parallelism * 
> kafkaProducersPoolSize)
> !image-2020-05-14-20-43-57-414.png!
>   !image-2020-05-14-20-45-24-030.png!
> !image-2020-05-14-20-45-59-878.png!
>  
> *The Kafka transaction.id check policy as follows:*
>  
> {color:#FF}string bytes.length can't larger than Short.MAX_VALUE 
> (32767){color}
> !image-2020-05-14-21-16-43-810.png!
> !image-2020-05-14-21-17-09-784.png!
>  
> *To reproduce this bug, the following conditions must be met:*
>  
>  # send msg to kafka with exactly once mode
>  # the task TaskName' length + TaskName's length is lager than the 32767 (A 
> very long line of SQL or window statements can appear)
> *I suggest a solution:*
>  
>      1.  Allows users to customize transactional.id 's prefix
> or
>      2. Do md5 on the prefix before returning the real transactional.id
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17691) FlinkKafkaProducer transactional.id too long when using Semantic.EXACTLY_ONCE

2020-12-28 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255506#comment-17255506
 ] 

Aljoscha Krettek commented on FLINK-17691:
--

Regarding Issues #1 and #2, these were proposed as potential fixes for this 
problem but in the end we went with the simplest solution, which is to just 
truncate the name. The Kafka transactional id and the operator UIDs that are 
mentioned in the documentation are not related, the latter is used only for 
mapping state to operators.

Regarding your issues #3 and #4, I think you're right. We could make the 
transactional Id *prefix* configurable, it's just that not enough users have 
asked for it so far. Or anyone really.

> FlinkKafkaProducer transactional.id too long when using Semantic.EXACTLY_ONCE
> -
>
> Key: FLINK-17691
> URL: https://issues.apache.org/jira/browse/FLINK-17691
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0, 1.11.0
>Reporter: freezhan
>Assignee: John Mathews
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
> Attachments: image-2020-05-14-20-43-57-414.png, 
> image-2020-05-14-20-45-24-030.png, image-2020-05-14-20-45-59-878.png, 
> image-2020-05-14-21-09-01-906.png, image-2020-05-14-21-16-43-810.png, 
> image-2020-05-14-21-17-09-784.png
>
>
> When sink to Kafka using the {color:#FF}Semantic.EXACTLY_ONCE {color}mode.
> The flink Kafka Connector Producer will auto set the 
> {color:#FF}transactional.id{color}, and the user - defined value are 
> ignored.
>  
> When the job operator name too long, will send failed
> transactional.id is exceeds the kafka  {color:#FF}coordinator_key{color} 
> limit
> !image-2020-05-14-21-09-01-906.png!
>  
> *The flink Kafka Connector policy for automatic generation of transaction.id 
> is as follows*
>  
> 1. use the {color:#FF}taskName + "-" + operatorUniqueID{color} as 
> transactional.id prefix (may be too long)
>   getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext)    
> getRuntimeContext()).getOperatorUniqueID()
> 2. Range of available transactional ids 
> [nextFreeTransactionalId, nextFreeTransactionalId + parallelism * 
> kafkaProducersPoolSize)
> !image-2020-05-14-20-43-57-414.png!
>   !image-2020-05-14-20-45-24-030.png!
> !image-2020-05-14-20-45-59-878.png!
>  
> *The Kafka transaction.id check policy as follows:*
>  
> {color:#FF}string bytes.length can't larger than Short.MAX_VALUE 
> (32767){color}
> !image-2020-05-14-21-16-43-810.png!
> !image-2020-05-14-21-17-09-784.png!
>  
> *To reproduce this bug, the following conditions must be met:*
>  
>  # send msg to kafka with exactly once mode
>  # the task TaskName' length + TaskName's length is lager than the 32767 (A 
> very long line of SQL or window statements can appear)
> *I suggest a solution:*
>  
>      1.  Allows users to customize transactional.id 's prefix
> or
>      2. Do md5 on the prefix before returning the real transactional.id
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17691) FlinkKafkaProducer transactional.id too long when using Semantic.EXACTLY_ONCE

2020-12-22 Thread Nazar Volynets (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17253484#comment-17253484
 ] 

Nazar Volynets commented on FLINK-17691:


*Issue #4*

Why not give a possibility to specify `transaction-id` manually ? What is the 
main reason ?

> FlinkKafkaProducer transactional.id too long when using Semantic.EXACTLY_ONCE
> -
>
> Key: FLINK-17691
> URL: https://issues.apache.org/jira/browse/FLINK-17691
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0, 1.11.0
>Reporter: freezhan
>Assignee: John Mathews
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
> Attachments: image-2020-05-14-20-43-57-414.png, 
> image-2020-05-14-20-45-24-030.png, image-2020-05-14-20-45-59-878.png, 
> image-2020-05-14-21-09-01-906.png, image-2020-05-14-21-16-43-810.png, 
> image-2020-05-14-21-17-09-784.png
>
>
> When sink to Kafka using the {color:#FF}Semantic.EXACTLY_ONCE {color}mode.
> The flink Kafka Connector Producer will auto set the 
> {color:#FF}transactional.id{color}, and the user - defined value are 
> ignored.
>  
> When the job operator name too long, will send failed
> transactional.id is exceeds the kafka  {color:#FF}coordinator_key{color} 
> limit
> !image-2020-05-14-21-09-01-906.png!
>  
> *The flink Kafka Connector policy for automatic generation of transaction.id 
> is as follows*
>  
> 1. use the {color:#FF}taskName + "-" + operatorUniqueID{color} as 
> transactional.id prefix (may be too long)
>   getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext)    
> getRuntimeContext()).getOperatorUniqueID()
> 2. Range of available transactional ids 
> [nextFreeTransactionalId, nextFreeTransactionalId + parallelism * 
> kafkaProducersPoolSize)
> !image-2020-05-14-20-43-57-414.png!
>   !image-2020-05-14-20-45-24-030.png!
> !image-2020-05-14-20-45-59-878.png!
>  
> *The Kafka transaction.id check policy as follows:*
>  
> {color:#FF}string bytes.length can't larger than Short.MAX_VALUE 
> (32767){color}
> !image-2020-05-14-21-16-43-810.png!
> !image-2020-05-14-21-17-09-784.png!
>  
> *To reproduce this bug, the following conditions must be met:*
>  
>  # send msg to kafka with exactly once mode
>  # the task TaskName' length + TaskName's length is lager than the 32767 (A 
> very long line of SQL or window statements can appear)
> *I suggest a solution:*
>  
>      1.  Allows users to customize transactional.id 's prefix
> or
>      2. Do md5 on the prefix before returning the real transactional.id
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17691) FlinkKafkaProducer transactional.id too long when using Semantic.EXACTLY_ONCE

2020-12-22 Thread Nazar Volynets (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17253463#comment-17253463
 ] 

Nazar Volynets commented on FLINK-17691:


[~freezhan] & [~jmathews3773],

*Issue #4*

+About+

> When sink to Kafka using the {color:#ff}Semantic.EXACTLY_ONCE {color}mode.

> The flink Kafka Connector Producer will auto set the 
> {color:#ff}transactional.id{color}, and the user - defined value are 
> ignored.

+Details+

There is nothing about such behaviour in official Flink documentation. Or I 
have missed it.

+Summary+

Should I create new/separate Jira issue to cover this gap ?

> FlinkKafkaProducer transactional.id too long when using Semantic.EXACTLY_ONCE
> -
>
> Key: FLINK-17691
> URL: https://issues.apache.org/jira/browse/FLINK-17691
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0, 1.11.0
>Reporter: freezhan
>Assignee: John Mathews
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
> Attachments: image-2020-05-14-20-43-57-414.png, 
> image-2020-05-14-20-45-24-030.png, image-2020-05-14-20-45-59-878.png, 
> image-2020-05-14-21-09-01-906.png, image-2020-05-14-21-16-43-810.png, 
> image-2020-05-14-21-17-09-784.png
>
>
> When sink to Kafka using the {color:#FF}Semantic.EXACTLY_ONCE {color}mode.
> The flink Kafka Connector Producer will auto set the 
> {color:#FF}transactional.id{color}, and the user - defined value are 
> ignored.
>  
> When the job operator name too long, will send failed
> transactional.id is exceeds the kafka  {color:#FF}coordinator_key{color} 
> limit
> !image-2020-05-14-21-09-01-906.png!
>  
> *The flink Kafka Connector policy for automatic generation of transaction.id 
> is as follows*
>  
> 1. use the {color:#FF}taskName + "-" + operatorUniqueID{color} as 
> transactional.id prefix (may be too long)
>   getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext)    
> getRuntimeContext()).getOperatorUniqueID()
> 2. Range of available transactional ids 
> [nextFreeTransactionalId, nextFreeTransactionalId + parallelism * 
> kafkaProducersPoolSize)
> !image-2020-05-14-20-43-57-414.png!
>   !image-2020-05-14-20-45-24-030.png!
> !image-2020-05-14-20-45-59-878.png!
>  
> *The Kafka transaction.id check policy as follows:*
>  
> {color:#FF}string bytes.length can't larger than Short.MAX_VALUE 
> (32767){color}
> !image-2020-05-14-21-16-43-810.png!
> !image-2020-05-14-21-17-09-784.png!
>  
> *To reproduce this bug, the following conditions must be met:*
>  
>  # send msg to kafka with exactly once mode
>  # the task TaskName' length + TaskName's length is lager than the 32767 (A 
> very long line of SQL or window statements can appear)
> *I suggest a solution:*
>  
>      1.  Allows users to customize transactional.id 's prefix
> or
>      2. Do md5 on the prefix before returning the real transactional.id
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17691) FlinkKafkaProducer transactional.id too long when using Semantic.EXACTLY_ONCE

2020-12-22 Thread Nazar Volynets (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17253462#comment-17253462
 ] 

Nazar Volynets commented on FLINK-17691:


*Issue #3*

And finally, why not give a possibility for user to specify `transaction-id` 
(what is the main reason) ?

Current option with auto generation brings as follows drawbacks:
 # It is useless or brink a lot of issues if I am using transactional consumers 
to drain data from `sink` topic. Basically it means if I will update my program 
then after that I am FORCED to reconfigure all my transactional consumers (as I 
will have new auto generated transaction-id)
 # Complicated deployment. As I will be FORCED to run Flink application to 
determine transaction-id. So before that I am blocked to deploy/finally 
configure my transactional consumers.

Should I create new/separate Jira issue for this use case or it is expect 
behaviour ?

> FlinkKafkaProducer transactional.id too long when using Semantic.EXACTLY_ONCE
> -
>
> Key: FLINK-17691
> URL: https://issues.apache.org/jira/browse/FLINK-17691
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0, 1.11.0
>Reporter: freezhan
>Assignee: John Mathews
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
> Attachments: image-2020-05-14-20-43-57-414.png, 
> image-2020-05-14-20-45-24-030.png, image-2020-05-14-20-45-59-878.png, 
> image-2020-05-14-21-09-01-906.png, image-2020-05-14-21-16-43-810.png, 
> image-2020-05-14-21-17-09-784.png
>
>
> When sink to Kafka using the {color:#FF}Semantic.EXACTLY_ONCE {color}mode.
> The flink Kafka Connector Producer will auto set the 
> {color:#FF}transactional.id{color}, and the user - defined value are 
> ignored.
>  
> When the job operator name too long, will send failed
> transactional.id is exceeds the kafka  {color:#FF}coordinator_key{color} 
> limit
> !image-2020-05-14-21-09-01-906.png!
>  
> *The flink Kafka Connector policy for automatic generation of transaction.id 
> is as follows*
>  
> 1. use the {color:#FF}taskName + "-" + operatorUniqueID{color} as 
> transactional.id prefix (may be too long)
>   getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext)    
> getRuntimeContext()).getOperatorUniqueID()
> 2. Range of available transactional ids 
> [nextFreeTransactionalId, nextFreeTransactionalId + parallelism * 
> kafkaProducersPoolSize)
> !image-2020-05-14-20-43-57-414.png!
>   !image-2020-05-14-20-45-24-030.png!
> !image-2020-05-14-20-45-59-878.png!
>  
> *The Kafka transaction.id check policy as follows:*
>  
> {color:#FF}string bytes.length can't larger than Short.MAX_VALUE 
> (32767){color}
> !image-2020-05-14-21-16-43-810.png!
> !image-2020-05-14-21-17-09-784.png!
>  
> *To reproduce this bug, the following conditions must be met:*
>  
>  # send msg to kafka with exactly once mode
>  # the task TaskName' length + TaskName's length is lager than the 32767 (A 
> very long line of SQL or window statements can appear)
> *I suggest a solution:*
>  
>      1.  Allows users to customize transactional.id 's prefix
> or
>      2. Do md5 on the prefix before returning the real transactional.id
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17691) FlinkKafkaProducer transactional.id too long when using Semantic.EXACTLY_ONCE

2020-12-22 Thread Nazar Volynets (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17253451#comment-17253451
 ] 

Nazar Volynets commented on FLINK-17691:


Hi [~freezhan] & [~jmathews3773],

Looks still exists or/and I am missing something. Below are details.

 

Basically have bumped with two issues:
 * first one directly related with this one
 * second - indirectly related

*Issue #1*

+Regarding+

_>> Do md5 on the transactional.id prefix_

+Details+

Flink version:

// build.gradle

 
{code:java}
ext {
  ...
  flinkVersion = '1.12.0'
  scalaBinaryVersion = '2.11'
  ...
}

dependencies {
  ...
  implementation 
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
  ...
}{code}
// App

 

 
{code:java}
public static void main(String[] args) {
  ...
  env.enableCheckpointing(1);
  env.setStateBackend(new 
RocksDBStateBackend("file:///.../config/checkpoints/rocksdb", true));
  
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  ...
  FlinkKafkaConsumer consumer = createConsumer(conf);
  FlinkKafkaProducer producer = createProducer(conf);


  env
.addSource(consumer)
.uid("kafka-consumer")
.addSink(producer)
.uid("kafka-producer")
  ;

  env.execute();
}

public static FlinkKafkaProducer createProducer(Configuration conf) {
  ...
  FlinkKafkaProducer producer = new FlinkKafkaProducer<>(topicDefault, 
new RecordKafkaSerSchema(true), props, 
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
  ...
  return producer;
}{code}
// Logs (of App executed/submitted locally from IDE)

 

 
{code:java}
// code placeholder
2020-12-22 13:52:08 [ForkJoinPool.commonPool-worker-9] INFO  ProducerConfig:347 
- ProducerConfig values: 
  ...
  transactional.id = Source: Custom Source -> Sink: 
Unnamed-e2b2f358d45860e6d949c8f7417842d6-24
  ...{code}
+Summary+

 

As we can see transaction-id is not md5 as stated above (or I am missing 
something). It looks that issue should be reopened as it is expected to be 
fixed in 1.12.0.

 

*Issue #2*

+Regarding+

> 1. use the {color:#ff}taskName + "-" + operatorUniqueID{color} as 
>transactional.id prefix (may be too long)

In reality `uid` specified after `source` & `sink` are ignored. But specifying 
of them are highly recommended in Flink official documentation:

[https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/savepoints.html#assigning-operator-ids]

Moreover as a workaround there is a possibility to specify `source` name & it 
is NOT ignored.

But there is NO possibility provided by Flink Java API to specify `sink` name.

+Details+

// build.gradle
{code:java}
ext {
  ...
  flinkVersion = '1.12.0'
  scalaBinaryVersion = '2.11'
  ...
}

dependencies {
  ...
  implementation 
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
  ...
}{code}
// App - `uid` are ignored
{code:java}
public static void main(String[] args) {
  ...
  env.enableCheckpointing(1);
  env.setStateBackend(new 
RocksDBStateBackend("file:///.../config/checkpoints/rocksdb", true));
  
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  ...
  FlinkKafkaConsumer consumer = createConsumer(conf);
  FlinkKafkaProducer producer = createProducer(conf);


  env
.addSource(consumer)
.uid("kafka-consumer") // is ignored
.addSink(producer)
.uid("kafka-producer") // is ignored
  ;

  env.execute();
}

public static FlinkKafkaProducer createProducer(Configuration conf) {
  ...
  FlinkKafkaProducer producer = new FlinkKafkaProducer<>(topicDefault, 
new RecordKafkaSerSchema(true), props, 
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
  ...
  return producer;
}{code}
// Logs (of App executed/submitted locally from IDE) - specify `source`/`sink` 
names
{code:java}
2020-12-22 13:52:08 [Source: Custom Source -> Sink: Unnamed (1/1)#0] INFO  
ProducerConfig:347 - ProducerConfig values:  
  ...
  transactional.id = Source: Custom Source -> Sink: 
Unnamed-e2b2f358d45860e6d949c8f7417842d6-20
  ...{code}
 

 

 

// App - specify `source`/`sink` names
{code:java}
public static void main(String[] args) {
  ...
  env.enableCheckpointing(1);
  env.setStateBackend(new 
RocksDBStateBackend("file:///.../config/checkpoints/rocksdb", true));
  
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  ...
  FlinkKafkaConsumer consumer = createConsumer(conf);
  FlinkKafkaProducer producer = createProducer(conf);


  env
.addSource(consumer, "kafka-consumer")
.addSink(producer) // NO way to specify name
  ;

  env.execute();
}

public static FlinkKafkaProducer createProducer(Configuration conf) {
  ...
  FlinkKafkaProducer producer = new FlinkKafkaProducer<>(topicDefault, 
new RecordKafkaSerSchema(true), props, 
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
  ...
  return producer;
}{code}
// Logs (of App 

[jira] [Commented] (FLINK-17691) FlinkKafkaProducer transactional.id too long when using Semantic.EXACTLY_ONCE

2020-11-19 Thread John Mathews (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17235745#comment-17235745
 ] 

John Mathews commented on FLINK-17691:
--

Submitted a fix, https://github.com/apache/flink/pull/14144

> FlinkKafkaProducer transactional.id too long when using Semantic.EXACTLY_ONCE
> -
>
> Key: FLINK-17691
> URL: https://issues.apache.org/jira/browse/FLINK-17691
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0, 1.11.0
>Reporter: freezhan
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2020-05-14-20-43-57-414.png, 
> image-2020-05-14-20-45-24-030.png, image-2020-05-14-20-45-59-878.png, 
> image-2020-05-14-21-09-01-906.png, image-2020-05-14-21-16-43-810.png, 
> image-2020-05-14-21-17-09-784.png
>
>
> When sink to Kafka using the {color:#FF}Semantic.EXACTLY_ONCE {color}mode.
> The flink Kafka Connector Producer will auto set the 
> {color:#FF}transactional.id{color}, and the user - defined value are 
> ignored.
>  
> When the job operator name too long, will send failed
> transactional.id is exceeds the kafka  {color:#FF}coordinator_key{color} 
> limit
> !image-2020-05-14-21-09-01-906.png!
>  
> *The flink Kafka Connector policy for automatic generation of transaction.id 
> is as follows*
>  
> 1. use the {color:#FF}taskName + "-" + operatorUniqueID{color} as 
> transactional.id prefix (may be too long)
>   getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext)    
> getRuntimeContext()).getOperatorUniqueID()
> 2. Range of available transactional ids 
> [nextFreeTransactionalId, nextFreeTransactionalId + parallelism * 
> kafkaProducersPoolSize)
> !image-2020-05-14-20-43-57-414.png!
>   !image-2020-05-14-20-45-24-030.png!
> !image-2020-05-14-20-45-59-878.png!
>  
> *The Kafka transaction.id check policy as follows:*
>  
> {color:#FF}string bytes.length can't larger than Short.MAX_VALUE 
> (32767){color}
> !image-2020-05-14-21-16-43-810.png!
> !image-2020-05-14-21-17-09-784.png!
>  
> *To reproduce this bug, the following conditions must be met:*
>  
>  # send msg to kafka with exactly once mode
>  # the task TaskName' length + TaskName's length is lager than the 32767 (A 
> very long line of SQL or window statements can appear)
> *I suggest a solution:*
>  
>      1.  Allows users to customize transactional.id 's prefix
> or
>      2. Do md5 on the prefix before returning the real transactional.id
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17691) FlinkKafkaProducer transactional.id too long when using Semantic.EXACTLY_ONCE

2020-11-19 Thread John Mathews (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17235662#comment-17235662
 ] 

John Mathews commented on FLINK-17691:
--

hey [~aljoscha] I am happy to submit a PR to fix this bug, can I simply submit 
one that truncates the taskName? I think we can truncate either in the 
TransactionIdGenerator or in the TaskInfo constructor itself, depending on if 
we want the limit to apply everywhere or not.

> FlinkKafkaProducer transactional.id too long when using Semantic.EXACTLY_ONCE
> -
>
> Key: FLINK-17691
> URL: https://issues.apache.org/jira/browse/FLINK-17691
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0, 1.11.0
>Reporter: freezhan
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2020-05-14-20-43-57-414.png, 
> image-2020-05-14-20-45-24-030.png, image-2020-05-14-20-45-59-878.png, 
> image-2020-05-14-21-09-01-906.png, image-2020-05-14-21-16-43-810.png, 
> image-2020-05-14-21-17-09-784.png
>
>
> When sink to Kafka using the {color:#FF}Semantic.EXACTLY_ONCE {color}mode.
> The flink Kafka Connector Producer will auto set the 
> {color:#FF}transactional.id{color}, and the user - defined value are 
> ignored.
>  
> When the job operator name too long, will send failed
> transactional.id is exceeds the kafka  {color:#FF}coordinator_key{color} 
> limit
> !image-2020-05-14-21-09-01-906.png!
>  
> *The flink Kafka Connector policy for automatic generation of transaction.id 
> is as follows*
>  
> 1. use the {color:#FF}taskName + "-" + operatorUniqueID{color} as 
> transactional.id prefix (may be too long)
>   getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext)    
> getRuntimeContext()).getOperatorUniqueID()
> 2. Range of available transactional ids 
> [nextFreeTransactionalId, nextFreeTransactionalId + parallelism * 
> kafkaProducersPoolSize)
> !image-2020-05-14-20-43-57-414.png!
>   !image-2020-05-14-20-45-24-030.png!
> !image-2020-05-14-20-45-59-878.png!
>  
> *The Kafka transaction.id check policy as follows:*
>  
> {color:#FF}string bytes.length can't larger than Short.MAX_VALUE 
> (32767){color}
> !image-2020-05-14-21-16-43-810.png!
> !image-2020-05-14-21-17-09-784.png!
>  
> *To reproduce this bug, the following conditions must be met:*
>  
>  # send msg to kafka with exactly once mode
>  # the task TaskName' length + TaskName's length is lager than the 32767 (A 
> very long line of SQL or window statements can appear)
> *I suggest a solution:*
>  
>      1.  Allows users to customize transactional.id 's prefix
> or
>      2. Do md5 on the prefix before returning the real transactional.id
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)