Hey all,
If no further concerns are raised in the upcoming week I will open the
FLIP for voting.
Thank you for your feedback,
Efrat

On Sun, 17 May 2026 at 20:24, Efrat Levitan <[email protected]> wrote:
>
> Hi Hongshun and Aleksandr
> Thank you for taking another pass at the FLIP.
> I updated it with additional context regarding state, testing and the default 
> state of the feature (disabled)
>
> State & Migration
> I plan to introduce a `topicIntegrityMapping` property to 
> `KafkaSourceEnumState`,
> When topic integrity is enabled, that field will be used for topic name -> id 
> mapping
> Otherwise the map is empty, so there's no excessive state by default.
> Additionally if the user enables the feature and later decides to disable it 
> for a job, the additional state is wiped.
> I plan to have migration tests to KafkaSourceEnumStateSerializerTest to 
> ensure backward compatibility.
>
> Pattern-based subscription
> Considering the new mechanism for obtaining topicIds from kafka server I 
> agree we can include pattern based subscription in the FLIP scope.
> I removed it from the out of scope section.
>
> Bounded sources
> As pointed out in the FLIP, continuous integrity is coupled to partition 
> discovery, because it reuses the fetched metadata, decoupling the two will 
> come with a trade off of an additional periodic metadata call.
> However the 1st metadata call during job startup (from either scratch or 
> recovery) is performed regardless, so for a bounded source, topic integrity 
> check is performed at least once if enabled. (and never otherwise)
>
> > 5. What happens if the user:  1. remove the topic from `topic` or
> `topic-pattern`  2. remove and recreate the topic in kafka   3.  In the
> next start, re-add this `topic` or `topic-pattern`? And what the state
> change in each step?
>
> You are asking about tracking Ids for topics the app no longer uses.
> Reliability-wise we could claim that topics that were used by the job should 
> be verified forever.
> However while topic recreation is a common scenario, expecting the app to 
> track a growing list of unused topics is an expensive corner case to handle
> Implying multiple metadata calls per interval in case of pattern based 
> subscription, or if the cluster was replaced.
> Therefore to the question, if a user removes topics from the job source 
> declaration, topic integrity will no longer track them, hence after step 3 
> the job will not fail.
> State changes (given the job committed at least 1 checkpoint before/while 
> stopped, otherwise the state will not be refreshed to catch up with the new 
> configurations):
> 1: Topic name -> id entry for the topic is removed from the checkpointed 
> state (as irrelevant)
> 2: -
> 3: A new topic name -> id entry is stored for the registered topic
> Eager to know your thoughts on this.
>
> Thanks,
> Efrat
>
>
> On Wed, 6 May 2026 at 10:59, Hongshun Wang <[email protected]> wrote:
>>
>> Hi Efrat,
>> 1. Please add the default value of scan.topic-integrity-check.enabled in
>> the public apis. The default value should be the same with current behavior.
>> 2. I don't know why `Topic pattern` is out of pattern? If a topic is
>> dropped and then created, I think every source pattern keeps the same
>> behavior(still read new topics or throwing exceptions based on
>> scan.topic-integrity-check.enabled)
>> 3. Same as Savonin , I am also curious about State migration.
>> 4. "partition discovery must be enabled
>> (KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS set to a positive
>> number)". I don't think scan.topic-integrity-check.enabled) should be
>> bonded together with partition discovery .  Even
>> PARTITION_DISCOVERY_INTERVAL_MS = -1, each time restart the job, still will
>> try to discover new partitions, thus still need to check topic integrity.
>> 5. What happens if the user:  1. remove the topic from `topic` or
>> `topic-pattern`  2. remove and recreate the topic in kafka   3.  In the
>> next start, re-add this `topic` or `topic-pattern`? And what the state
>> change in each step?
>>
>> Best,
>> Hongshun
>>
>> On Mon, May 4, 2026 at 8:10 PM Aleksandr Savonin <[email protected]>
>> wrote:
>>
>> > Hi Efrat,
>> > Thanks for the amazing work on the FLIP, the update and for descoping
>> > the FLIP! I think the new direction looks much cleaner.
>> > I have a few follow-up questions:
>> > 1. State migration: restoring from a pre-FLIP checkpoint. What happens
>> > when a user upgrades to a connector version that supports this FLIP,
>> > enables enableTopicIntegrityCheck(), and resumes from an existing
>> > checkpoint that has no persisted topic IDs?
>> > 2. Could you please clarify about excluded topic-pattern subscribers?
>> > The FLIP rationale ("pattern-based subscriptions inherently accept any
>> > matching topic") justifies tolerating new topics that begin matching
>> > the pattern, but I don't think it justifies accepting a
>> > previously-known topic being deleted and recreated under the same
>> > name. From a user's perspective, a checkpointed offset for a
>> > previously-known topic is just as invalid after recreation regardless
>> > of whether the topic was reached via a list or a pattern. Could
>> > pattern subscribers reuse the same logic to allow new names freely,
>> > but enforce ID stability for already-known names?
>> > 3. Where will topic IDs live in the enumerator state? I think the FLIP
>> > would benefit from a concrete datamodel "sketch".
>> > 4. Bounded sources: Doesn't KafkaSourceBuilder override
>> > partition.discovery.interval.ms to -1 whenever setBounded(...) is
>> > called, with no opt-out? The FLIP states that with discovery disabled,
>> > the integrity check runs only once on startup. For long-running
>> > bounded jobs (e.g. backfill against a snapshot up to `latest()`),
>> > recreation mid-job is exactly the scenario this FLIP exists to detect,
>> > and a single startup check feels insufficient. Should bounded sources
>> > be allowed to opt into periodic checks when integrity verification is
>> > enabled, or is this an explicitly accepted limitation?  Maybe Im
>> > missing something.
>> >
>> > Thanks again,
>> > Aleksandr Savonin
>> >
>> > On Tue, 21 Apr 2026 at 13:42, Efrat Levitan <[email protected]> wrote:
>> > >
>> > > Hi everyone,
>> > > Following the feedback and offline discussions, I came to realize
>> > descoping
>> > > FLIP-562 to source topics will have better ROI.
>> > > As source topic integrity presents the pressing risk to job recovery, and
>> > > sink topics recreation, while inflicting inevitable incompleteness, does
>> > > not.
>> > > I'd like to defer the sink topics integrity work until a sink coordinator
>> > > is introduced, which can be discussed orthogonal to FLIP-562.
>> > >
>> > > I updated the FLIP to reflect the API changes suggested by Rui and
>> > Leonard,
>> > > so a user is not asked to provide a topicId anymore [1], and descoped to
>> > > source topics, with a note about sink topics [2].
>> > >
>> > > I'd appreciate your feedback on the updated proposal.
>> > >
>> > > Thanks,
>> > > Efrat.
>> > >
>> > > [1]
>> > >
>> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406619238#FLIP562:TopicintegritychecksinKafkaConnector-PublicInterfaces
>> > > [2]
>> > >
>> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406619238#FLIP562:TopicintegritychecksinKafkaConnector-Sinktopicsintegrity
>> > >
>> > >
>> > > On Sun, 8 Feb 2026 at 20:30, Efrat Levitan <[email protected]>
>> > wrote:
>> > >
>> > > > Hi everyone, thank you for reviewing the FLIP.
>> > > >
>> > > > > 1. Manually specifying the topic ID seems to be
>> > > > > a bit costly for users. Why does it require flink users to pass the
>> > topic
>> > > > > id
>> > > > > manually or explicitly? It would be helpful for users if flink
>> > source or
>> > > > > sink
>> > > > > are able to fetch topic id automatically in the beginning, right?
>> > > >
>> > > > Thanks Rui for your suggestion.
>> > > > Topic ID auto-retrieval indeed sounds like a better UX, I will update
>> > the
>> > > > FLIP accordingly
>> > > > However a downside to be aware of is that topic names to IDs mapping
>> > must
>> > > > be checkpointed to retain the same ID across recoveries,
>> > > > inflicting statefulness on so-far stateless operators like the
>> > > > non-transactional sink.
>> > > >
>> > > > > 2. On the Sink side, the proposal uses
>> > "setPartitionDiscoveryIntervalMs".
>> > > > > Since "Partition Discovery" in Flink implies Source behavior (finding
>> > > > > splits), and the Sink is doing a metadata update, have you considered
>> > > > > "setMetadataRefreshIntervalMs"? Maybe that aligns better with the
>> > > > > underlying Kafka Client terminology (Metadata Fetch/Lookup/Refresh).
>> > > >
>> > > > >  introducing “Partition Discovery” for sinks feels conceptually
>> > > > inconsistent with Flink’s existing design.
>> > > >
>> > > > Thanks Aleksandr and Leonard
>> > > > The concept of partition discovery isn’t tied with source.
>> > > > It was introduced to address the increase of partitions count, which
>> > may
>> > > > happen on both source and sink topics.
>> > > > Assigning splits to the newly found partitions is a source
>> > implementation
>> > > > detail, and is also not on the docs.
>> > > > I decided to advertise the feature as "sink partition discovery" to
>> > help
>> > > > folks already familiar with the concept better understand the
>> > implications.
>> > > > What do you think?
>> > > >
>> > > > > 3. Is it correct that enabling partition discovery is strictly
>> > required
>> > > > for
>> > > > > the integrity check?
>> > > >
>> > > > The integrity verification is performed upon metadata refresh.
>> > > > On both source and sink, disabling partition discovery means metadata
>> > > > isn't refreshed,
>> > > > so integrity check will be performed once on startup.
>> > > >
>> > > > > 4. The disclaimer mentions an "inevitable short period of time"
>> > where the
>> > > > > job reads/writes to the new topic before detection. If a checkpoint
>> > > > > completes successfully during this window, do we risk to have a
>> > corrupted
>> > > > > state? Is this a known limitation?
>> > > >
>> > > > The risk of data corruption upon topic recreation is discussed on the
>> > FLIP.
>> > > > We can not save users from themselves. After all, if they decide to
>> > > > recreate a topic, the data is lost.
>> > > > Indeed the proposed check can not guarantee 100% protection due to its
>> > > > periodical nature.
>> > > > Nevertheless the job will fail shortly after, and jobs restored from
>> > that
>> > > > checkpoint will not be allowed to start.
>> > > > For more sensitive jobs you could obviously decrease the interval,
>> > > > (trading performance)
>> > > >
>> > > > > 5. Will there be metrics exposed for monitoring the integrity checks?
>> > > > We will not measure integrity checks, as a failure immediately
>> > triggers a
>> > > > global unrecoverable failure.
>> > > >
>> > > > Efrat
>> > > >
>> > > > On Tue, 20 Jan 2026 at 05:58, Leonard Xu <[email protected]> wrote:
>> > > >
>> > > >> Hi Efrat,
>> > > >>
>> > > >> Thanks for kicking off this discussion — I’m also in favor of adding a
>> > > >> check for the Kafka Connector.
>> > > >>
>> > > >>
>> > > >> > My only concern is that manually specifying the topic ID seems to be
>> > > >> > a bit costly for users. Why does it require flink users to pass the
>> > > >> topic
>> > > >> > id
>> > > >> > manually or explicitly? It would be helpful for users if flink
>> > source or
>> > > >> > sink
>> > > >> > are able to fetch topic id automatically in the beginning, right?
>> > > >>
>> > > >> I share Rui’s concern about requiring users to manually specify the
>> > > >> topicId. Given that topicId-based communication isn’t part of Kafka’s
>> > > >> current roadmap (as noted in your “Future Plan” section) and is
>> > starting
>> > > >> available in Kafka 4.0–4.2, it seems premature—and potentially
>> > > >> burdensome—to expose it in Flink’s user-facing API. Ideally, if
>> > needed at
>> > > >> all, the source or sink should be able to resolve the topicId
>> > automatically
>> > > >> during initialization.
>> > > >>
>> > > >> Additionally, I agree with Aleksandr that introducing “Partition
>> > > >> Discovery” for sinks feels conceptually inconsistent with Flink’s
>> > existing
>> > > >> design.
>> > > >>
>> > > >> Best,
>> > > >> Leonard
>> > > >>
>> > > >> >
>> > > >> > On Thu, Jan 15, 2026 at 12:38 PM Efrat Levitan <
>> > [email protected]>
>> > > >> > wrote:
>> > > >> >
>> > > >> >> Hi everyone, I'd like to start a discussion on FLIP-562 [1] to
>> > > >> implement
>> > > >> >> topic integrity checks on kafka connector, as currently the
>> > connector
>> > > >> is
>> > > >> >> blind to topic recreations, presenting risks to job consistency.
>> > > >> >>
>> > > >> >> Kafka APIs traditionally rely on topic names, which do not
>> > guarantee
>> > > >> >> uniqueness over time.
>> > > >> >> Though both KIP-516 [2] and KIP-848 [3] discuss topicId based
>> > > >> >> communication, client support is not on the roadmap [4].
>> > > >> >>
>> > > >> >> The FLIP contains the new proposal to make flink kafka connector
>> > > >> sensitive
>> > > >> >> to topicId changes through integrity checks over the periodical
>> > > >> metadata
>> > > >> >> fetching (AKA topic partition discovery), and sets the grounds for
>> > > >> future
>> > > >> >> topicId based communication with both the user and kafka server.
>> > > >> >>
>> > > >> >> I'd appreciate your feedback on the proposed changes.
>> > > >> >>
>> > > >> >> Thanks,
>> > > >> >> Efrat.
>> > > >> >>
>> > > >> >> [1]
>> > > >> >>
>> > > >> >>
>> > > >>
>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-562%3A+Topic+integrity+checks+in+Kafka+Connector
>> > > >> >>
>> > > >> >> [2]
>> > > >> >>
>> > > >> >>
>> > > >>
>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers
>> > > >> >>
>> > > >> >> [3]
>> > > >> >>
>> > > >> >>
>> > > >>
>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol
>> > > >> >>
>> > > >> >> [4]
>> > > >> >>
>> > > >> >>
>> > > >>
>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers/#KIP516:TopicIdentifiers-Clients
>> > > >> >>
>> > > >>
>> > > >>
>> >
>> >
>> >
>> > --
>> > Kind regards,
>> > Aleksandr
>> >

Reply via email to