[ 
https://issues.apache.org/jira/browse/FLINK-17691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17253451#comment-17253451
 ] 

Nazar Volynets commented on FLINK-17691:
----------------------------------------

Hi [~freezhan] & [~jmathews3773],

Looks still exists or/and I am missing something. Below are details.

 

Basically have bumped with two issues:
 * first one directly related with this one
 * second - indirectly related

*Issue #1*

+Regarding+

_>> Do md5 on the transactional.id prefix_

+Details+

Flink version:

// build.gradle

 
{code:java}
ext {
  ...
  flinkVersion = '1.12.0'
  scalaBinaryVersion = '2.11'
  ...
}

dependencies {
  ...
  implementation 
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
  ...
}{code}
// App

 

 
{code:java}
public static void main(String[] args) {
  ...
  env.enableCheckpointing(10000);
  env.setStateBackend(new 
RocksDBStateBackend("file:///.../config/checkpoints/rocksdb", true));
  
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  ...
  FlinkKafkaConsumer<Record> consumer = createConsumer(conf);
  FlinkKafkaProducer<Record> producer = createProducer(conf);


  env
        .addSource(consumer)
        .uid("kafka-consumer")
        .addSink(producer)
        .uid("kafka-producer")
  ;

  env.execute();
}

public static FlinkKafkaProducer<Record> createProducer(Configuration conf) {
  ...
  FlinkKafkaProducer<Record> producer = new FlinkKafkaProducer<>(topicDefault, 
new RecordKafkaSerSchema(true), props, 
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
  ...
  return producer;
}{code}
// Logs (of App executed/submitted locally from IDE)

 

 
{code:java}
// code placeholder
2020-12-22 13:52:08 [ForkJoinPool.commonPool-worker-9] INFO  ProducerConfig:347 
- ProducerConfig values: 
  ...
  transactional.id = Source: Custom Source -> Sink: 
Unnamed-e2b2f358d45860e6d949c8f7417842d6-24
  ...{code}
+Summary+

 

As we can see transaction-id is not md5 as stated above (or I am missing 
something). It looks that issue should be reopened as it is expected to be 
fixed in 1.12.0.

 

*Issue #2*

+Regarding+

> 1. use the {color:#ff0000}taskName + "-" + operatorUniqueID{color} as 
>transactional.id prefix (may be too long)

In reality `uid` specified after `source` & `sink` are ignored. But specifying 
of them are highly recommended in Flink official documentation:

[https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/savepoints.html#assigning-operator-ids]

Moreover as a workaround there is a possibility to specify `source` name & it 
is NOT ignored.

But there is NO possibility provided by Flink Java API to specify `sink` name.

+Details+

// build.gradle
{code:java}
ext {
  ...
  flinkVersion = '1.12.0'
  scalaBinaryVersion = '2.11'
  ...
}

dependencies {
  ...
  implementation 
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
  ...
}{code}
// App - `uid` are ignored
{code:java}
public static void main(String[] args) {
  ...
  env.enableCheckpointing(10000);
  env.setStateBackend(new 
RocksDBStateBackend("file:///.../config/checkpoints/rocksdb", true));
  
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  ...
  FlinkKafkaConsumer<Record> consumer = createConsumer(conf);
  FlinkKafkaProducer<Record> producer = createProducer(conf);


  env
        .addSource(consumer)
        .uid("kafka-consumer") // is ignored
        .addSink(producer)
        .uid("kafka-producer") // is ignored
  ;

  env.execute();
}

public static FlinkKafkaProducer<Record> createProducer(Configuration conf) {
  ...
  FlinkKafkaProducer<Record> producer = new FlinkKafkaProducer<>(topicDefault, 
new RecordKafkaSerSchema(true), props, 
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
  ...
  return producer;
}{code}
// Logs (of App executed/submitted locally from IDE) - specify `source`/`sink` 
names
{code:java}
2020-12-22 13:52:08 [Source: Custom Source -> Sink: Unnamed (1/1)#0] INFO  
ProducerConfig:347 - ProducerConfig values:          
  ...
  transactional.id = Source: Custom Source -> Sink: 
Unnamed-e2b2f358d45860e6d949c8f7417842d6-20
  ...{code}
 

 

 

// App - specify `source`/`sink` names
{code:java}
public static void main(String[] args) {
  ...
  env.enableCheckpointing(10000);
  env.setStateBackend(new 
RocksDBStateBackend("file:///.../config/checkpoints/rocksdb", true));
  
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  ...
  FlinkKafkaConsumer<Record> consumer = createConsumer(conf);
  FlinkKafkaProducer<Record> producer = createProducer(conf);


  env
        .addSource(consumer, "kafka-consumer")
        .addSink(producer) // NO way to specify name
  ;

  env.execute();
}

public static FlinkKafkaProducer<Record> createProducer(Configuration conf) {
  ...
  FlinkKafkaProducer<Record> producer = new FlinkKafkaProducer<>(topicDefault, 
new RecordKafkaSerSchema(true), props, 
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
  ...
  return producer;
}{code}
// Logs (of App executed/submitted locally from IDE) - specify `source`/`sink` 
names
{code:java}
2020-12-22 13:52:08 [Source: kafka-consumer -> Sink: Unnamed (1/1)#0] INFO  
ProducerConfig:347 - ProducerConfig values:         
  ...
  transactional.id = Source: kafka-consumer -> Sink: 
Unnamed-e2b2f358d45860e6d949c8f7417842d6-20
  ...{code}
+Summary+

As we can see `operatorUniqueID` specified after `source` and/or `sink` are 
ignored.

Moreover we can specify `source` name & it is taken into account but there is 
no possibility to do the same for `sink` via Flink Java API.

Should I create new/separate Jira issue for this use case or it is expect 
behaviour ?

> FlinkKafkaProducer transactional.id too long when using Semantic.EXACTLY_ONCE
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-17691
>                 URL: https://issues.apache.org/jira/browse/FLINK-17691
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.10.0, 1.11.0
>            Reporter: freezhan
>            Assignee: John Mathews
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.12.0
>
>         Attachments: image-2020-05-14-20-43-57-414.png, 
> image-2020-05-14-20-45-24-030.png, image-2020-05-14-20-45-59-878.png, 
> image-2020-05-14-21-09-01-906.png, image-2020-05-14-21-16-43-810.png, 
> image-2020-05-14-21-17-09-784.png
>
>
> When sink to Kafka using the {color:#FF0000}Semantic.EXACTLY_ONCE {color}mode.
> The flink Kafka Connector Producer will auto set the 
> {color:#FF0000}transactional.id{color}, and the user - defined value are 
> ignored.
>  
> When the job operator name too long, will send failed
> transactional.id is exceeds the kafka  {color:#FF0000}coordinator_key{color} 
> limit
> !image-2020-05-14-21-09-01-906.png!
>  
> *The flink Kafka Connector policy for automatic generation of transaction.id 
> is as follows*
>  
> 1. use the {color:#FF0000}taskName + "-" + operatorUniqueID{color} as 
> transactional.id prefix (may be too long)
>   getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext)    
> getRuntimeContext()).getOperatorUniqueID()
> 2. Range of available transactional ids 
> [nextFreeTransactionalId, nextFreeTransactionalId + parallelism * 
> kafkaProducersPoolSize)
> !image-2020-05-14-20-43-57-414.png!
>   !image-2020-05-14-20-45-24-030.png!
> !image-2020-05-14-20-45-59-878.png!
>  
> *The Kafka transaction.id check policy as follows:*
>  
> {color:#FF0000}string bytes.length can't larger than Short.MAX_VALUE 
> (32767){color}
> !image-2020-05-14-21-16-43-810.png!
> !image-2020-05-14-21-17-09-784.png!
>  
> *To reproduce this bug, the following conditions must be met:*
>  
>  # send msg to kafka with exactly once mode
>  # the task TaskName' length + TaskName's length is lager than the 32767 (A 
> very long line of SQL or window statements can appear)
> *I suggest a solution:*
>  
>      1.  Allows users to customize transactional.id 's prefix
> or
>      2. Do md5 on the prefix before returning the real transactional.id
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to