Thank you Fabian. We are waiting for FLINK-18450<https://issues.apache.org/jira/browse/FLINK-18450> (watermark alignment) before switching to KafkaSource, currently we use extra logic on top of FlinkKafkaConsumer to support watermark alignment.
Thanks, Alexey [FLINK-18450] Add watermark alignment logic to SourceReaderBase. - ASF JIRA - issues.apache.org<https://issues.apache.org/jira/browse/FLINK-18450> With the per-split watermark support, SourceReaderBase should be able to perform watermark alignment so that all the connectors that inherit from it would benefit. issues.apache.org ________________________________ From: Fabian Paul <fp...@apache.org> Sent: Friday, January 14, 2022 4:02 AM To: Alexey Trenikhun <yen...@msn.com> Cc: Flink User Mail List <user@flink.apache.org> Subject: Re: FlinkKafkaConsumer and FlinkKafkaProducer and Kafka Cluster Migration Hi Alexey, The bootstrap servers are not part of the state so you are good to go although please stop all your jobs with a savepoint and resume from it with the new properties. I guess to migrate the FlinkKafkaConsumer to an empty topic you can discard the state if you ensure that all messages beginning from the latest checkpointed offset are in the new topic. Please be aware that we deprecated the FlinkKafkaConsumer and FlinkKafkaProducer with Flink 1.14 in favor of the KafkaSource and KafkaSink. In the future, we plan to remove both and they will not receive further updates. Best, Fabian On Fri, Jan 14, 2022 at 3:08 AM Alexey Trenikhun <yen...@msn.com> wrote: > > Hello, > > Currently we are using FlinkKafkaConsumer and FlinkKafkaProducer and planning > to migrate to different Kafka cluster. Are boostrap servers, username and > passwords part of FlinkKafkaConsumer and FlinkKafkaProducer ? So if we take > savepoint change boostrap server and credentials and start job from > savepoint, will it use new connection properties and old one from savepoint? > Assuming that we connected to new Kafka cluster, I think that > FlinkKafkaConsumer offsets will be reset, because new Kafka cluster will be > empty and FlinkKafkaConsumer will not be able to seek to stored offsets, am I > right? > > Thanks, > Alexey