Hi Efrat,
1. Please add the default value of scan.topic-integrity-check.enabled in
the public apis. The default value should be the same with current behavior.
2. I don't know why `Topic pattern` is out of pattern? If a topic is
dropped and then created, I think every source pattern keeps the same
behavior(still read new topics or throwing exceptions based on
scan.topic-integrity-check.enabled)
3. Same as Savonin , I am also curious about State migration.
4. "partition discovery must be enabled
(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS set to a positive
number)". I don't think scan.topic-integrity-check.enabled) should be
bonded together with partition discovery .  Even
PARTITION_DISCOVERY_INTERVAL_MS = -1, each time restart the job, still will
try to discover new partitions, thus still need to check topic integrity.
5. What happens if the user:  1. remove the topic from `topic` or
`topic-pattern`  2. remove and recreate the topic in kafka   3.  In the
next start, re-add this `topic` or `topic-pattern`? And what the state
change in each step?

Best,
Hongshun

On Mon, May 4, 2026 at 8:10 PM Aleksandr Savonin <[email protected]>
wrote:

> Hi Efrat,
> Thanks for the amazing work on the FLIP, the update and for descoping
> the FLIP! I think the new direction looks much cleaner.
> I have a few follow-up questions:
> 1. State migration: restoring from a pre-FLIP checkpoint. What happens
> when a user upgrades to a connector version that supports this FLIP,
> enables enableTopicIntegrityCheck(), and resumes from an existing
> checkpoint that has no persisted topic IDs?
> 2. Could you please clarify about excluded topic-pattern subscribers?
> The FLIP rationale ("pattern-based subscriptions inherently accept any
> matching topic") justifies tolerating new topics that begin matching
> the pattern, but I don't think it justifies accepting a
> previously-known topic being deleted and recreated under the same
> name. From a user's perspective, a checkpointed offset for a
> previously-known topic is just as invalid after recreation regardless
> of whether the topic was reached via a list or a pattern. Could
> pattern subscribers reuse the same logic to allow new names freely,
> but enforce ID stability for already-known names?
> 3. Where will topic IDs live in the enumerator state? I think the FLIP
> would benefit from a concrete datamodel "sketch".
> 4. Bounded sources: Doesn't KafkaSourceBuilder override
> partition.discovery.interval.ms to -1 whenever setBounded(...) is
> called, with no opt-out? The FLIP states that with discovery disabled,
> the integrity check runs only once on startup. For long-running
> bounded jobs (e.g. backfill against a snapshot up to `latest()`),
> recreation mid-job is exactly the scenario this FLIP exists to detect,
> and a single startup check feels insufficient. Should bounded sources
> be allowed to opt into periodic checks when integrity verification is
> enabled, or is this an explicitly accepted limitation?  Maybe Im
> missing something.
>
> Thanks again,
> Aleksandr Savonin
>
> On Tue, 21 Apr 2026 at 13:42, Efrat Levitan <[email protected]> wrote:
> >
> > Hi everyone,
> > Following the feedback and offline discussions, I came to realize
> descoping
> > FLIP-562 to source topics will have better ROI.
> > As source topic integrity presents the pressing risk to job recovery, and
> > sink topics recreation, while inflicting inevitable incompleteness, does
> > not.
> > I'd like to defer the sink topics integrity work until a sink coordinator
> > is introduced, which can be discussed orthogonal to FLIP-562.
> >
> > I updated the FLIP to reflect the API changes suggested by Rui and
> Leonard,
> > so a user is not asked to provide a topicId anymore [1], and descoped to
> > source topics, with a note about sink topics [2].
> >
> > I'd appreciate your feedback on the updated proposal.
> >
> > Thanks,
> > Efrat.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406619238#FLIP562:TopicintegritychecksinKafkaConnector-PublicInterfaces
> > [2]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406619238#FLIP562:TopicintegritychecksinKafkaConnector-Sinktopicsintegrity
> >
> >
> > On Sun, 8 Feb 2026 at 20:30, Efrat Levitan <[email protected]>
> wrote:
> >
> > > Hi everyone, thank you for reviewing the FLIP.
> > >
> > > > 1. Manually specifying the topic ID seems to be
> > > > a bit costly for users. Why does it require flink users to pass the
> topic
> > > > id
> > > > manually or explicitly? It would be helpful for users if flink
> source or
> > > > sink
> > > > are able to fetch topic id automatically in the beginning, right?
> > >
> > > Thanks Rui for your suggestion.
> > > Topic ID auto-retrieval indeed sounds like a better UX, I will update
> the
> > > FLIP accordingly
> > > However a downside to be aware of is that topic names to IDs mapping
> must
> > > be checkpointed to retain the same ID across recoveries,
> > > inflicting statefulness on so-far stateless operators like the
> > > non-transactional sink.
> > >
> > > > 2. On the Sink side, the proposal uses
> "setPartitionDiscoveryIntervalMs".
> > > > Since "Partition Discovery" in Flink implies Source behavior (finding
> > > > splits), and the Sink is doing a metadata update, have you considered
> > > > "setMetadataRefreshIntervalMs"? Maybe that aligns better with the
> > > > underlying Kafka Client terminology (Metadata Fetch/Lookup/Refresh).
> > >
> > > >  introducing “Partition Discovery” for sinks feels conceptually
> > > inconsistent with Flink’s existing design.
> > >
> > > Thanks Aleksandr and Leonard
> > > The concept of partition discovery isn’t tied with source.
> > > It was introduced to address the increase of partitions count, which
> may
> > > happen on both source and sink topics.
> > > Assigning splits to the newly found partitions is a source
> implementation
> > > detail, and is also not on the docs.
> > > I decided to advertise the feature as "sink partition discovery" to
> help
> > > folks already familiar with the concept better understand the
> implications.
> > > What do you think?
> > >
> > > > 3. Is it correct that enabling partition discovery is strictly
> required
> > > for
> > > > the integrity check?
> > >
> > > The integrity verification is performed upon metadata refresh.
> > > On both source and sink, disabling partition discovery means metadata
> > > isn't refreshed,
> > > so integrity check will be performed once on startup.
> > >
> > > > 4. The disclaimer mentions an "inevitable short period of time"
> where the
> > > > job reads/writes to the new topic before detection. If a checkpoint
> > > > completes successfully during this window, do we risk to have a
> corrupted
> > > > state? Is this a known limitation?
> > >
> > > The risk of data corruption upon topic recreation is discussed on the
> FLIP.
> > > We can not save users from themselves. After all, if they decide to
> > > recreate a topic, the data is lost.
> > > Indeed the proposed check can not guarantee 100% protection due to its
> > > periodical nature.
> > > Nevertheless the job will fail shortly after, and jobs restored from
> that
> > > checkpoint will not be allowed to start.
> > > For more sensitive jobs you could obviously decrease the interval,
> > > (trading performance)
> > >
> > > > 5. Will there be metrics exposed for monitoring the integrity checks?
> > > We will not measure integrity checks, as a failure immediately
> triggers a
> > > global unrecoverable failure.
> > >
> > > Efrat
> > >
> > > On Tue, 20 Jan 2026 at 05:58, Leonard Xu <[email protected]> wrote:
> > >
> > >> Hi Efrat,
> > >>
> > >> Thanks for kicking off this discussion — I’m also in favor of adding a
> > >> check for the Kafka Connector.
> > >>
> > >>
> > >> > My only concern is that manually specifying the topic ID seems to be
> > >> > a bit costly for users. Why does it require flink users to pass the
> > >> topic
> > >> > id
> > >> > manually or explicitly? It would be helpful for users if flink
> source or
> > >> > sink
> > >> > are able to fetch topic id automatically in the beginning, right?
> > >>
> > >> I share Rui’s concern about requiring users to manually specify the
> > >> topicId. Given that topicId-based communication isn’t part of Kafka’s
> > >> current roadmap (as noted in your “Future Plan” section) and is
> starting
> > >> available in Kafka 4.0–4.2, it seems premature—and potentially
> > >> burdensome—to expose it in Flink’s user-facing API. Ideally, if
> needed at
> > >> all, the source or sink should be able to resolve the topicId
> automatically
> > >> during initialization.
> > >>
> > >> Additionally, I agree with Aleksandr that introducing “Partition
> > >> Discovery” for sinks feels conceptually inconsistent with Flink’s
> existing
> > >> design.
> > >>
> > >> Best,
> > >> Leonard
> > >>
> > >> >
> > >> > On Thu, Jan 15, 2026 at 12:38 PM Efrat Levitan <
> [email protected]>
> > >> > wrote:
> > >> >
> > >> >> Hi everyone, I'd like to start a discussion on FLIP-562 [1] to
> > >> implement
> > >> >> topic integrity checks on kafka connector, as currently the
> connector
> > >> is
> > >> >> blind to topic recreations, presenting risks to job consistency.
> > >> >>
> > >> >> Kafka APIs traditionally rely on topic names, which do not
> guarantee
> > >> >> uniqueness over time.
> > >> >> Though both KIP-516 [2] and KIP-848 [3] discuss topicId based
> > >> >> communication, client support is not on the roadmap [4].
> > >> >>
> > >> >> The FLIP contains the new proposal to make flink kafka connector
> > >> sensitive
> > >> >> to topicId changes through integrity checks over the periodical
> > >> metadata
> > >> >> fetching (AKA topic partition discovery), and sets the grounds for
> > >> future
> > >> >> topicId based communication with both the user and kafka server.
> > >> >>
> > >> >> I'd appreciate your feedback on the proposed changes.
> > >> >>
> > >> >> Thanks,
> > >> >> Efrat.
> > >> >>
> > >> >> [1]
> > >> >>
> > >> >>
> > >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-562%3A+Topic+integrity+checks+in+Kafka+Connector
> > >> >>
> > >> >> [2]
> > >> >>
> > >> >>
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers
> > >> >>
> > >> >> [3]
> > >> >>
> > >> >>
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol
> > >> >>
> > >> >> [4]
> > >> >>
> > >> >>
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers/#KIP516:TopicIdentifiers-Clients
> > >> >>
> > >>
> > >>
>
>
>
> --
> Kind regards,
> Aleksandr
>

Reply via email to