[ https://issues.apache.org/jira/browse/FLINK-17691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17253451#comment-17253451 ]
Nazar Volynets commented on FLINK-17691: ---------------------------------------- Hi [~freezhan] & [~jmathews3773], Looks still exists or/and I am missing something. Below are details. Basically have bumped with two issues: * first one directly related with this one * second - indirectly related *Issue #1* +Regarding+ _>> Do md5 on the transactional.id prefix_ +Details+ Flink version: // build.gradle {code:java} ext { ... flinkVersion = '1.12.0' scalaBinaryVersion = '2.11' ... } dependencies { ... implementation "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" ... }{code} // App {code:java} public static void main(String[] args) { ... env.enableCheckpointing(10000); env.setStateBackend(new RocksDBStateBackend("file:///.../config/checkpoints/rocksdb", true)); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); ... FlinkKafkaConsumer<Record> consumer = createConsumer(conf); FlinkKafkaProducer<Record> producer = createProducer(conf); env .addSource(consumer) .uid("kafka-consumer") .addSink(producer) .uid("kafka-producer") ; env.execute(); } public static FlinkKafkaProducer<Record> createProducer(Configuration conf) { ... FlinkKafkaProducer<Record> producer = new FlinkKafkaProducer<>(topicDefault, new RecordKafkaSerSchema(true), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); ... return producer; }{code} // Logs (of App executed/submitted locally from IDE) {code:java} // code placeholder 2020-12-22 13:52:08 [ForkJoinPool.commonPool-worker-9] INFO ProducerConfig:347 - ProducerConfig values: ... transactional.id = Source: Custom Source -> Sink: Unnamed-e2b2f358d45860e6d949c8f7417842d6-24 ...{code} +Summary+ As we can see transaction-id is not md5 as stated above (or I am missing something). It looks that issue should be reopened as it is expected to be fixed in 1.12.0. *Issue #2* +Regarding+ > 1. use the {color:#ff0000}taskName + "-" + operatorUniqueID{color} as >transactional.id prefix (may be too long) In reality `uid` specified after `source` & `sink` are ignored. But specifying of them are highly recommended in Flink official documentation: [https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/savepoints.html#assigning-operator-ids] Moreover as a workaround there is a possibility to specify `source` name & it is NOT ignored. But there is NO possibility provided by Flink Java API to specify `sink` name. +Details+ // build.gradle {code:java} ext { ... flinkVersion = '1.12.0' scalaBinaryVersion = '2.11' ... } dependencies { ... implementation "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" ... }{code} // App - `uid` are ignored {code:java} public static void main(String[] args) { ... env.enableCheckpointing(10000); env.setStateBackend(new RocksDBStateBackend("file:///.../config/checkpoints/rocksdb", true)); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); ... FlinkKafkaConsumer<Record> consumer = createConsumer(conf); FlinkKafkaProducer<Record> producer = createProducer(conf); env .addSource(consumer) .uid("kafka-consumer") // is ignored .addSink(producer) .uid("kafka-producer") // is ignored ; env.execute(); } public static FlinkKafkaProducer<Record> createProducer(Configuration conf) { ... FlinkKafkaProducer<Record> producer = new FlinkKafkaProducer<>(topicDefault, new RecordKafkaSerSchema(true), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); ... return producer; }{code} // Logs (of App executed/submitted locally from IDE) - specify `source`/`sink` names {code:java} 2020-12-22 13:52:08 [Source: Custom Source -> Sink: Unnamed (1/1)#0] INFO ProducerConfig:347 - ProducerConfig values: ... transactional.id = Source: Custom Source -> Sink: Unnamed-e2b2f358d45860e6d949c8f7417842d6-20 ...{code} // App - specify `source`/`sink` names {code:java} public static void main(String[] args) { ... env.enableCheckpointing(10000); env.setStateBackend(new RocksDBStateBackend("file:///.../config/checkpoints/rocksdb", true)); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); ... FlinkKafkaConsumer<Record> consumer = createConsumer(conf); FlinkKafkaProducer<Record> producer = createProducer(conf); env .addSource(consumer, "kafka-consumer") .addSink(producer) // NO way to specify name ; env.execute(); } public static FlinkKafkaProducer<Record> createProducer(Configuration conf) { ... FlinkKafkaProducer<Record> producer = new FlinkKafkaProducer<>(topicDefault, new RecordKafkaSerSchema(true), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); ... return producer; }{code} // Logs (of App executed/submitted locally from IDE) - specify `source`/`sink` names {code:java} 2020-12-22 13:52:08 [Source: kafka-consumer -> Sink: Unnamed (1/1)#0] INFO ProducerConfig:347 - ProducerConfig values: ... transactional.id = Source: kafka-consumer -> Sink: Unnamed-e2b2f358d45860e6d949c8f7417842d6-20 ...{code} +Summary+ As we can see `operatorUniqueID` specified after `source` and/or `sink` are ignored. Moreover we can specify `source` name & it is taken into account but there is no possibility to do the same for `sink` via Flink Java API. Should I create new/separate Jira issue for this use case or it is expect behaviour ? > FlinkKafkaProducer transactional.id too long when using Semantic.EXACTLY_ONCE > ----------------------------------------------------------------------------- > > Key: FLINK-17691 > URL: https://issues.apache.org/jira/browse/FLINK-17691 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.10.0, 1.11.0 > Reporter: freezhan > Assignee: John Mathews > Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > Attachments: image-2020-05-14-20-43-57-414.png, > image-2020-05-14-20-45-24-030.png, image-2020-05-14-20-45-59-878.png, > image-2020-05-14-21-09-01-906.png, image-2020-05-14-21-16-43-810.png, > image-2020-05-14-21-17-09-784.png > > > When sink to Kafka using the {color:#FF0000}Semantic.EXACTLY_ONCE {color}mode. > The flink Kafka Connector Producer will auto set the > {color:#FF0000}transactional.id{color}, and the user - defined value are > ignored. > > When the job operator name too long, will send failed > transactional.id is exceeds the kafka {color:#FF0000}coordinator_key{color} > limit > !image-2020-05-14-21-09-01-906.png! > > *The flink Kafka Connector policy for automatic generation of transaction.id > is as follows* > > 1. use the {color:#FF0000}taskName + "-" + operatorUniqueID{color} as > transactional.id prefix (may be too long) > getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext) > getRuntimeContext()).getOperatorUniqueID() > 2. Range of available transactional ids > [nextFreeTransactionalId, nextFreeTransactionalId + parallelism * > kafkaProducersPoolSize) > !image-2020-05-14-20-43-57-414.png! > !image-2020-05-14-20-45-24-030.png! > !image-2020-05-14-20-45-59-878.png! > > *The Kafka transaction.id check policy as follows:* > > {color:#FF0000}string bytes.length can't larger than Short.MAX_VALUE > (32767){color} > !image-2020-05-14-21-16-43-810.png! > !image-2020-05-14-21-17-09-784.png! > > *To reproduce this bug, the following conditions must be met:* > > # send msg to kafka with exactly once mode > # the task TaskName' length + TaskName's length is lager than the 32767 (A > very long line of SQL or window statements can appear) > *I suggest a solution:* > > 1. Allows users to customize transactional.id 's prefix > or > 2. Do md5 on the prefix before returning the real transactional.id > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)