I'm assuming you have an incompatible schema change. If you don't, there are several easy ways.
Your plan actually looks like the best option. Of course, this requires that you eventually union the inputs. If you can do that without a custom mapper and with one read schema only, you may even use 1 source with 2 topics and the same reader schema. For idleness detection, I recommend using the new Kafka Source in 1.12.4+ with the new source interface that supports idleness out-of-the-box. On Mon, Jul 26, 2021 at 5:52 AM JING ZHANG <beyond1...@gmail.com> wrote: > Hi Dan, > Do you plan to continue to read a new Kafka topic after finished read > current Kafka topic? > If yes, Your plan could works. > > BTW, if the schema of data in the new Kafka topic and the current topic > are same with each other, however their topic name are different with each > other, maybe you could try the following method, which is more simpler. > 1. Do a savepoint (without drain) for the job after finish all current > topic. > 2. Update the Kafka source topic name to the new topic name > 3. Restored job from the savepoint. > After restored, the job would read the data from new topic from > earliest_offset because new topic name is different the previous one, so > those KafkaTopicPartition could not be found in restored state. > And restored state would be overwritten with new Kafka topic and offsets > after a checkpoint. > pease ensure that UID of the successor operators are not changed. > > Best, > JING ZHANG > > Dan Hill <quietgol...@gmail.com> 于2021年7月25日周日 上午3:56写道: > >> Hi! >> >> *Scenario* >> I want to eventually do a breaking change to a Kafka source (major >> version change) which requires a new Kafka topic. >> >> *Question* >> What utilities exist to help with this in Flink? What best practices >> exist? >> >> My plan is roughly the following: >> 1. Change my Flink job to support both kafka sources. Union them. Deal >> with idle data sources (either temp with flag or force through watermark >> events). >> 2. Change the Kafka producer to write to the new topic. >> 3. When enough time has passed, delete the old operator (using >> allowNonRestoredState). >> >