Hi Gordon,

Thanks for your feedback as always.

Why not just Map<KafkaClusterIdentifier, ClusterMetadata>?

I think it makes sense to relocate the bootstrapServer field in the Kafka
properties (ClusterMetadata abstraction), since that is intuitive based
on how it is defined in the Kafka clients library. It also makes the
uniqueness of a cluster clear--it's not a combination of cluster name and
bootstrap server but rather just the cluster name. Bootstrap server(s) can
change over time based on Kafka design (exposing a few brokers or putting
all brokers behind a single entry point).

I'll solicit more feedback on the name in the voting thread, with the 3
options (DynamicKafkaSource, MultiClusterKafkaSource,
DiscoveringKafkaSource). I will start the thread tomorrow after minor
touches on the FLIP based on our last few exchanges!

Best,
Mason

On Wed, Jun 14, 2023 at 1:31 PM Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi Mason,
>
> Thanks for addressing my comments. I agree that option 3 seems more
> reasonable.
>
> > Reorganize the metadata in a Map<String, ClusterMetadata> in
> `KafkaStream` where the String is the proposed
> `KafkaClusterIdentifier.name` field.
>
> Why not just Map<KafkaClusterIdentifier, ClusterMetadata>?
>
> Regarding naming, I like DynamicKafkaSource as that's what I immediately
> thought of when reading the FLIP, but I'm not married to the name :)
>
> In principle, it looks like the FLIP is in good shape and generally people
> seem to like the idea of having this connector in Flink.
> I'd be in favor of an official vote to allow this to move forward.
>
> Thanks,
> Gordon
>
> On Mon, Jun 12, 2023 at 1:57 PM Mason Chen <mas.chen6...@gmail.com> wrote:
>
> > >
> > > My main worry for doing this as a later iteration is that this would
> > > probably be a breaking change for the public interface. If that can be
> > > avoided and planned ahead, I'm fine with moving forward with how it is
> > > right now.
> >
> >
> > Make sense. Considering the public interfaces, I think we still want to
> > provide clients the ability to pin certain configurations in the
> > builder--however, cluster specific configurations may not be known
> upfront
> > or generalize to all clusters so there would need to be changes in the
> > `KafkaMetadataService` interface. This could be achieved by exposing via:
> >
> > 1. A separate API (e.g. `Map<KafkaClusterIdentifier, Properties>
> > getKafkaClusterProperties()`) in KafkaMetadataService
> > 2. In `KafkaClusterIdentifier` as this already contains some
> configuration
> > (e.g. Bootstrap server) in which case we should rename the class to
> > something like `KafkaCluster` as it is no longer just an identifier
> > 3. Reorganize the metadata in a Map<String, ClusterMetadata> in
> > `KafkaStream` where the String is the proposed
> > `KafkaClusterIdentifier.name` field.
> >
> > I am preferring option 3 since this simplifies equals() checks on
> > KafkaClusterIdentifier (e.g. is it the name, bootstrap, or both?).
> >
> > Small correction for the MultiClusterKafkaSourceEnumerator section: "This
> > > reader is responsible for discovering and assigning splits from 1+
> > cluster"
> >
> > Thanks for the catch!
> >
> > the defining characteristic is the dynamic discovery vs. the fact that
> > > multiple clusters [...]
> >
> >
> >
> > I think the "Table" in the name of those SQL connectors should avoid
> > > confusion. Perhaps we can also solicit other ideas? I would throw
> > > "DiscoveringKafkaSource" into the mix.
> >
> >  Agreed with Gordon's and your suggestions. Right, the only public facing
> > name for SQL is `kafka` for the SQL connector identifier. Based on your
> > suggestions:
> >
> > 1. MultiClusterKafkaSource
> > 2. DynamicKafkaSource
> > 3. DiscoveringKafkaSource
> > 4. MutableKafkaSource
> > 5. AdaptiveKafkaSource
> >
> > I added a few of my own. I do prefer 2. What do others think?
> >
> > Best,
> > Mason
> >
> > On Sun, Jun 11, 2023 at 1:12 PM Thomas Weise <t...@apache.org> wrote:
> >
> > > Hi Mason,
> > >
> > > Thanks for the iterations on the FLIP, I think this is in a very good
> > shape
> > > now.
> > >
> > > Small correction for the MultiClusterKafkaSourceEnumerator section:
> "This
> > > reader is responsible for discovering and assigning splits from 1+
> > cluster"
> > >
> > > Regarding the user facing name of the connector: I agree with Gordon
> that
> > > the defining characteristic is the dynamic discovery vs. the fact that
> > > multiple clusters may be consumed in parallel. (Although, as described
> in
> > > the FLIP, lossless consumer migration only works with a strategy that
> > > involves intermittent parallel consumption of old and new clusters to
> > drain
> > > and switch.)
> > >
> > > I think the "Table" in the name of those SQL connectors should avoid
> > > confusion. Perhaps we can also solicit other ideas? I would throw
> > > "DiscoveringKafkaSource" into the mix.
> > >
> > > Cheers,
> > > Thomas
> > >
> > >
> > >
> > >
> > > On Fri, Jun 9, 2023 at 3:40 PM Tzu-Li (Gordon) Tai <
> tzuli...@apache.org>
> > > wrote:
> > >
> > > > > Regarding (2), definitely. This is something we planned to add
> later
> > on
> > > > but
> > > > so far keeping things common has been working well.
> > > >
> > > > My main worry for doing this as a later iteration is that this would
> > > > probably be a breaking change for the public interface. If that can
> be
> > > > avoided and planned ahead, I'm fine with moving forward with how it
> is
> > > > right now.
> > > >
> > > > > DynamicKafkaSource may be confusing because it is really similar to
> > the
> > > > KafkaDynamicSource/Sink (table connectors).
> > > >
> > > > The table / sql Kafka connectors (KafkaDynamicTableFactory,
> > > > KafkaDynamicTableSource / KafkaDynamicTableSink) are all internal
> > classes
> > > > not really meant to be exposed to the user though.
> > > > It can cause some confusion internally for the code maintainers, but
> on
> > > the
> > > > actual public surface I don't see this being an issue.
> > > >
> > > > Thanks,
> > > > Gordon
> > > >
> > > > On Wed, Jun 7, 2023 at 8:55 PM Mason Chen <mas.chen6...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Gordon,
> > > > >
> > > > > Thanks for taking a look!
> > > > >
> > > > > Regarding (1), there is a need from the readers to send this event
> at
> > > > > startup because the reader state may reflect outdated metadata.
> Thus,
> > > the
> > > > > reader should not start without fresh metadata. With fresh
> metadata,
> > > the
> > > > > reader can filter splits from state--this filtering capability is
> > > > > ultimately how we solve the common issue of "I re-configured my
> Kafka
> > > > > source and removed some topic, but it refers to the old topic due
> to
> > > > state
> > > > > *[1]*". I did not mention this because I thought this is more of a
> > > detail
> > > > > but I'll make a brief note of it.
> > > > >
> > > > > Regarding (2), definitely. This is something we planned to add
> later
> > on
> > > > but
> > > > > so far keeping things common has been working well. In that regard,
> > yes
> > > > the
> > > > > metadata service should expose these configurations but the source
> > > should
> > > > > not check it into state unlike the other metadata. I'm going to add
> > it
> > > > to a
> > > > > section called "future enhancements". This is also feedback that
> > Ryan,
> > > an
> > > > > interested user, gave earlier in this thread.
> > > > >
> > > > > Regarding (3), that's definitely a good point and there are some
> real
> > > use
> > > > > cases, in addition to what you mentioned, to use this in single
> > cluster
> > > > > mode (see *[1] *above). DynamicKafkaSource may be confusing because
> > it
> > > is
> > > > > really similar to the KafkaDynamicSource/Sink (table connectors).
> > > > >
> > > > > Best,
> > > > > Mason
> > > > >
> > > > > On Wed, Jun 7, 2023 at 10:40 AM Tzu-Li (Gordon) Tai <
> > > tzuli...@apache.org
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Mason,
> > > > > >
> > > > > > Thanks for updating the FLIP. In principle, I believe this would
> > be a
> > > > > > useful addition. Some comments so far:
> > > > > >
> > > > > > 1. In this sequence diagram [1], why is there a need for a
> > > > > > GetMetadataUpdateEvent from the MultiClusterSourceReader going to
> > the
> > > > > > MultiClusterSourceEnumerator? Shouldn't the enumerator simply
> start
> > > > > sending
> > > > > > metadata update events to the reader once it is registered at the
> > > > > > enumerator?
> > > > > >
> > > > > > 2. Looking at the new builder API, there's a few configurations
> > that
> > > > are
> > > > > > common across *all *discovered Kafka clusters / topics,
> > specifically
> > > > the
> > > > > > deserialization schema, offset initialization strategy, Kafka
> > client
> > > > > > properties, and consumer group ID. Is there any use case that
> users
> > > > would
> > > > > > want to have these configurations differ across different Kafka
> > > > clusters?
> > > > > > If that's the case, would it make more sense to encapsulate these
> > > > > > configurations to be owned by the metadata service?
> > > > > >
> > > > > > 3. Is MultiClusterKafkaSource the best name for this connector? I
> > > find
> > > > > that
> > > > > > the dynamic aspect of Kafka connectivity to be a more defining
> > > > > > characteristic, and that is the main advantage it has compared to
> > the
> > > > > > static KafkaSource. A user may want to use this new connector
> over
> > > > > > KafkaSource even if they're just consuming from a single Kafka
> > > cluster;
> > > > > for
> > > > > > example, one immediate use case I can think of is Kafka
> > > repartitioning
> > > > > with
> > > > > > zero Flink job downtime. They create a new topic with higher
> > > > parallelism
> > > > > > and repartition their Kafka records from the old topic to the new
> > > > topic,
> > > > > > and they want the consuming Flink job to be able to move from the
> > old
> > > > > topic
> > > > > > to the new topic with zero-downtime while retaining exactly-once
> > > > > > guarantees. So, perhaps DynamicKafkaSource is a better name for
> > this
> > > > > > connector?
> > > > > >
> > > > > > Thanks,
> > > > > > Gordon
> > > > > >
> > > > > > [1]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source?preview=/217389320/255072018/image-2023-6-7_2-29-13.png
> > > > > >
> > > > > > On Wed, Jun 7, 2023 at 3:07 AM Mason Chen <
> mas.chen6...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Jing,
> > > > > > >
> > > > > > > Thanks for the prompt feedback! I had some confusion with how
> to
> > > > resize
> > > > > > > images in confluence--anyways, I have made the font bigger,
> added
> > > > white
> > > > > > > background, and also made the diagrams themselves bigger.
> > > > > > >
> > > > > > > Regarding the exactly once semantics, that's definitely good to
> > > point
> > > > > out
> > > > > > > in the doc. Thus, I have broken out my "Basic Idea" section
> into:
> > > > > > > 1. an intro
> > > > > > > 2. details about KafkaMetadataService
> > > > > > > 3. details about KafkaStream and KafkaClusterId (the metadata)
> > > > > > > 4. details about exactly once semantics and consistency
> > guarantees
> > > > > > >
> > > > > > > This should give readers enough context about the design goals
> > and
> > > > > > > interactions before deep diving into the class interfaces.
> > > > > > >
> > > > > > > Best,
> > > > > > > Mason
> > > > > > >
> > > > > > > On Tue, Jun 6, 2023 at 1:25 PM Jing Ge
> > <j...@ververica.com.invalid
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Mason,
> > > > > > > >
> > > > > > > > It is a very practical feature that many users are keen to
> use.
> > > > > Thanks
> > > > > > to
> > > > > > > > the previous discussion, the FLIP now looks informative.
> Thanks
> > > for
> > > > > > your
> > > > > > > > proposal. One small suggestion is that the attached images
> are
> > > > quite
> > > > > > > small
> > > > > > > > to read if we don't click and enlarge them. Besides that, It
> is
> > > > > > difficult
> > > > > > > > to read the text on the current sequence diagram because it
> > has a
> > > > > > > > transparent background. Would you like to replace it with a
> > white
> > > > > > > > background?
> > > > > > > >
> > > > > > > > Exactly-one is one of the key features of Kafka connector. I
> > have
> > > > the
> > > > > > > same
> > > > > > > > concern as Qingsheng. Since you have answered questions about
> > it
> > > > > > > > previously, would you like to create an extra section in your
> > > FLIP
> > > > to
> > > > > > > > explicitly describe scenarios when exactly-one is supported
> and
> > > > when
> > > > > it
> > > > > > > is
> > > > > > > > not?
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > > Jing
> > > > > > > >
> > > > > > > > On Mon, Jun 5, 2023 at 11:41 PM Mason Chen <
> > > mas.chen6...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi all,
> > > > > > > > >
> > > > > > > > > I'm working on FLIP-246 again, for the Multi Cluster Kafka
> > > Source
> > > > > > > > > contribution. The document has been updated with some more
> > > > context
> > > > > > > about
> > > > > > > > > how it can solve the Kafka topic removal scenario and a
> > > sequence
> > > > > > > diagram
> > > > > > > > to
> > > > > > > > > illustrate how the components interact.
> > > > > > > > >
> > > > > > > > > Looking forward to any feedback!
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Mason
> > > > > > > > >
> > > > > > > > > On Wed, Oct 12, 2022 at 11:12 PM Mason Chen <
> > > > > mas.chen6...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Ryan,
> > > > > > > > > >
> > > > > > > > > > Thanks for the additional context! Yes, the offset
> > > initializer
> > > > > > would
> > > > > > > > need
> > > > > > > > > > to take a cluster as a parameter and the
> > > > > > MultiClusterKafkaSourceSplit
> > > > > > > > can
> > > > > > > > > > be exposed in an initializer.
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Mason
> > > > > > > > > >
> > > > > > > > > > On Thu, Oct 6, 2022 at 11:00 AM Ryan van Huuksloot <
> > > > > > > > > > ryan.vanhuuksl...@shopify.com> wrote:
> > > > > > > > > >
> > > > > > > > > >> Hi Mason,
> > > > > > > > > >>
> > > > > > > > > >> Thanks for the clarification! In regards to the addition
> > to
> > > > the
> > > > > > > > > >> OffsetInitializer of this API - this would be an awesome
> > > > > addition
> > > > > > > and
> > > > > > > > I
> > > > > > > > > >> think this entire FLIP would be a great addition to the
> > > Flink.
> > > > > > > > > >>
> > > > > > > > > >> To provide more context as to why we need particular
> > > offsets,
> > > > we
> > > > > > use
> > > > > > > > > >> Hybrid Source to currently backfill from buckets prior
> to
> > > > > reading
> > > > > > > from
> > > > > > > > > >> Kafka. We have a service that will tell us what offset
> has
> > > > last
> > > > > > been
> > > > > > > > > loaded
> > > > > > > > > >> into said bucket which we will use to initialize the
> > > > KafkaSource
> > > > > > > > > >> OffsetsInitializer. We couldn't use a timestamp here and
> > the
> > > > > > offset
> > > > > > > > > would
> > > > > > > > > >> be different for each Cluster.
> > > > > > > > > >>
> > > > > > > > > >> In pseudocode, we'd want the ability to do something
> like
> > > this
> > > > > > with
> > > > > > > > > >> HybridSources - if this is possible.
> > > > > > > > > >>
> > > > > > > > > >> ```scala
> > > > > > > > > >> val offsetsMetadata: Map[TopicPartition, Long] = // Get
> > > > current
> > > > > > > > offsets
> > > > > > > > > >> from OffsetReaderService
> > > > > > > > > >> val multiClusterArchiveSource: MultiBucketFileSource[T]
> =
> > //
> > > > > Data
> > > > > > is
> > > > > > > > > read
> > > > > > > > > >> from different buckets (multiple topics)
> > > > > > > > > >> val multiClusterKafkaSource: MultiClusterKafkaSource[T]
> =
> > > > > > > > > >> MultiClusterKafkaSource.builder()
> > > > > > > > > >>   .setKafkaMetadataService(new
> KafkaMetadataServiceImpl())
> > > > > > > > > >>   .setStreamIds(List.of("my-stream-1", "my-stream-2"))
> > > > > > > > > >>   .setGroupId("myConsumerGroup")
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
> > > > > > > > > >>   .setStartingOffsets(offsetsMetadata)
> > > > > > > > > >>   .setProperties(properties)
> > > > > > > > > >>   .build()
> > > > > > > > > >> val source =
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> HybridSource.builder(multiClusterArchiveSource).addSource(multiClusterKafkaSource).build()
> > > > > > > > > >> ```
> > > > > > > > > >>
> > > > > > > > > >> Few notes:
> > > > > > > > > >> - TopicPartition won't work because the topic may be the
> > > same
> > > > > name
> > > > > > > as
> > > > > > > > > >> this is something that is supported IIRC
> > > > > > > > > >> - I chose to pass a map into starting offsets just for
> > > > > > demonstrative
> > > > > > > > > >> purposes, I would be fine with whatever data structure
> > would
> > > > > work
> > > > > > > best
> > > > > > > > > >>
> > > > > > > > > >> Ryan van Huuksloot
> > > > > > > > > >> Data Developer | Production Engineering | Streaming
> > > > Capabilities
> > > > > > > > > >> [image: Shopify]
> > > > > > > > > >> <
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> On Mon, Oct 3, 2022 at 11:29 PM Mason Chen <
> > > > > > mas.chen6...@gmail.com>
> > > > > > > > > >> wrote:
> > > > > > > > > >>
> > > > > > > > > >>> Hi Ryan,
> > > > > > > > > >>>
> > > > > > > > > >>> Just copying your message over to the email chain.
> > > > > > > > > >>>
> > > > > > > > > >>> Hi Mason,
> > > > > > > > > >>>> First off, thanks for putting this FLIP together!
> Sorry
> > > for
> > > > > the
> > > > > > > > delay.
> > > > > > > > > >>>> Full disclosure Mason and I chatted a little bit at
> > Flink
> > > > > > Forward
> > > > > > > > > 2022 but
> > > > > > > > > >>>> I have tried to capture the questions I had for him
> > then.
> > > > > > > > > >>>> I'll start the conversation with a few questions:
> > > > > > > > > >>>> 1. The concept of streamIds is not clear to me in the
> > > > proposal
> > > > > > and
> > > > > > > > > >>>> could use some more information. If I understand
> > > correctly,
> > > > > they
> > > > > > > > will
> > > > > > > > > be
> > > > > > > > > >>>> used in the MetadataService to link KafkaClusters to
> > ones
> > > > you
> > > > > > want
> > > > > > > > to
> > > > > > > > > use?
> > > > > > > > > >>>> If you assign stream ids using `setStreamIds`, how can
> > you
> > > > > > > > dynamically
> > > > > > > > > >>>> increase the number of clusters you consume if the
> list
> > of
> > > > > > > StreamIds
> > > > > > > > > is
> > > > > > > > > >>>> static? I am basing this off of your example
> > > > > > > .setStreamIds(List.of(
> > > > > > > > > >>>> "my-stream-1", "my-stream-2")) so I could be off base
> > with
> > > > my
> > > > > > > > > >>>> assumption. If you don't mind clearing up the
> intention,
> > > > that
> > > > > > > would
> > > > > > > > be
> > > > > > > > > >>>> great!
> > > > > > > > > >>>> 2. How would offsets work if you wanted to use this
> > > > > > > > > >>>> MultiClusterKafkaSource with a file based backfill? In
> > the
> > > > > case
> > > > > > I
> > > > > > > am
> > > > > > > > > >>>> thinking of, you have a bucket backed archive of Kafka
> > > data
> > > > > per
> > > > > > > > > cluster.
> > > > > > > > > >>>> and you want to pick up from the last offset in the
> > > archived
> > > > > > > system,
> > > > > > > > > how
> > > > > > > > > >>>> would you set OffsetInitializers "per cluster"
> > potentially
> > > > as
> > > > > a
> > > > > > > > > function or
> > > > > > > > > >>>> are you limited to setting an OffsetInitializer for
> the
> > > > entire
> > > > > > > > Source?
> > > > > > > > > >>>> 3. Just to make sure - because this system will layer
> on
> > > top
> > > > > of
> > > > > > > > > >>>> Flink-27 and use KafkaSource for some aspects under
> the
> > > > hood,
> > > > > > the
> > > > > > > > > watermark
> > > > > > > > > >>>> alignment that was introduced in FLIP-182 / Flink 1.15
> > > would
> > > > > be
> > > > > > > > > possible
> > > > > > > > > >>>> across multiple clusters if you assign them to the
> same
> > > > > > alignment
> > > > > > > > > group?
> > > > > > > > > >>>> Thanks!
> > > > > > > > > >>>> Ryan
> > > > > > > > > >>>
> > > > > > > > > >>>
> > > > > > > > > >>> 1. The stream ids are static--however, what the
> physical
> > > > > clusters
> > > > > > > and
> > > > > > > > > >>> topics that they map to can mutate. Let's say
> my-stream-1
> > > > maps
> > > > > to
> > > > > > > > > cluster-1
> > > > > > > > > >>> and topic-1. The KafkaMetadataService can return a
> > > different
> > > > > > > mapping
> > > > > > > > > when
> > > > > > > > > >>> metadata is fetched the next time e.g. my-stream-1
> > mapping
> > > to
> > > > > > > > > cluster-1 and
> > > > > > > > > >>> topic-1, and cluster-2 and topic-2. Let me add more
> > details
> > > > on
> > > > > > how
> > > > > > > > the
> > > > > > > > > >>> KafkaMetadataService is used.
> > > > > > > > > >>> 2. The current design limits itself to a single
> > configured
> > > > > > > > > >>> OffsetInitializer that is used for every underlying
> > > > > KafkaSource.
> > > > > > > > > >>> 3. Yes, it is in our plan to integrate this source with
> > > > > watermark
> > > > > > > > > >>> alignment in which the user can align watermarks from
> all
> > > > > > clusters
> > > > > > > > > within
> > > > > > > > > >>> the single. It will leverage the Kafka Source
> > > implementation
> > > > to
> > > > > > > > achieve
> > > > > > > > > >>> this.
> > > > > > > > > >>>
> > > > > > > > > >>> With regards to 2, it's an interesting idea. I think we
> > can
> > > > > > extend
> > > > > > > > the
> > > > > > > > > >>> design to support a map of offset initializers to
> > clusters,
> > > > > which
> > > > > > > > would
> > > > > > > > > >>> solve your file based backfill. If you initialize the
> > > source
> > > > > > with a
> > > > > > > > > single
> > > > > > > > > >>> timestamp, the current design may work for your
> usecase,
> > > but
> > > > I
> > > > > > > can't
> > > > > > > > > tell
> > > > > > > > > >>> without more details. Thanks for your interest and
> sorry
> > > for
> > > > > the
> > > > > > > > delay!
> > > > > > > > > >>>
> > > > > > > > > >>> Best,
> > > > > > > > > >>> Mason
> > > > > > > > > >>>
> > > > > > > > > >>> On Mon, Aug 29, 2022 at 10:02 AM Mason Chen <
> > > > > > > mas.chen6...@gmail.com>
> > > > > > > > > >>> wrote:
> > > > > > > > > >>>
> > > > > > > > > >>>> Hi Max,
> > > > > > > > > >>>>
> > > > > > > > > >>>> Thanks for taking a look!
> > > > > > > > > >>>>
> > > > > > > > > >>>>
> > > > > > > > > >>>>> I'm wondering whether we can share some of the code
> of
> > > the
> > > > > > > existing
> > > > > > > > > >>>>> KafkaSource.
> > > > > > > > > >>>>>
> > > > > > > > > >>>>
> > > > > > > > > >>>> That is the intention--let me call it out more
> > explicitly.
> > > > > > > > > >>>>
> > > > > > > > > >>>> Regarding your questions:
> > > > > > > > > >>>>
> > > > > > > > > >>>> 1. Indeed, the KafkaMetadataService has the describe
> > > stream
> > > > > > method
> > > > > > > > to
> > > > > > > > > >>>> get a particular stream id. We decided to support
> > getting
> > > > all
> > > > > > the
> > > > > > > > > streams
> > > > > > > > > >>>> for subscribing via a regex pattern (similar to the
> > Kafka
> > > > > Source
> > > > > > > > > >>>> implementation).
> > > > > > > > > >>>>
> > > > > > > > > >>>> 2. The idea was that if metadata is removed that it is
> > no
> > > > > longer
> > > > > > > > > active.
> > > > > > > > > >>>>
> > > > > > > > > >>>> 3. The MetadataUpdateEvent's format is specifically
> for
> > > > > > > > communicating
> > > > > > > > > >>>> to the reader what the clusters and topics it should
> > read
> > > > > from.
> > > > > > It
> > > > > > > > > doesn't
> > > > > > > > > >>>> need stream information since it doesn't interact with
> > the
> > > > > > > > > >>>> KafkaMetadataService (only the enumerator interacts
> with
> > > > it).
> > > > > > > > > >>>>
> > > > > > > > > >>>> 4. Metrics will be reported per cluster. For example,
> > > > > > KafkaSource
> > > > > > > > > >>>> already reports pendingRecords and the corresponding
> > > metric,
> > > > > for
> > > > > > > > > example
> > > > > > > > > >>>> for cluster0, would be a metric called
> > > > > > > > > >>>>
> > > > > `MultiClusterKafkaSource.kafkaCluster.cluster0.pendingRecords`.
> > > > > > In
> > > > > > > > > cluster
> > > > > > > > > >>>> removal, these metrics wouldn't be valid so the
> > > > implementation
> > > > > > can
> > > > > > > > > close
> > > > > > > > > >>>> them.
> > > > > > > > > >>>>
> > > > > > > > > >>>> 5. I'm fine with that name; however, I got some
> feedback
> > > > > > > internally
> > > > > > > > > >>>> since the bulk of the logic is in stopping the
> scheduled
> > > > tasks
> > > > > > of
> > > > > > > > the
> > > > > > > > > >>>> underlying enumerators and handling cluster
> > unavailability
> > > > > edge
> > > > > > > > > cases. I'm
> > > > > > > > > >>>> open to changing the name if the design changes (it is
> > an
> > > > > > internal
> > > > > > > > > class
> > > > > > > > > >>>> anyways, so we can make these name changes without
> > > breaking
> > > > > > > users).
> > > > > > > > > >>>>
> > > > > > > > > >>>> 6. Yes, there are some limitations but I have not
> > > > > > > > > >>>> considered implementing that in the basic ConfigMap
> > > > > > > > > >>>> implementation--currently users are allowed to do any
> > > > changes.
> > > > > > For
> > > > > > > > > example,
> > > > > > > > > >>>> a user should not delete and recreate a topic, on the
> > same
> > > > > > > cluster.
> > > > > > > > > >>>> Regarding the logic to properly remove a cluster, a
> user
> > > > could
> > > > > > > > > certainly
> > > > > > > > > >>>> support it with a custom KafkaMetadataService--I
> > intended
> > > to
> > > > > > keep
> > > > > > > > the
> > > > > > > > > >>>> ConfigMap implementation basic for simple use cases
> (so
> > > > users
> > > > > > here
> > > > > > > > > would
> > > > > > > > > >>>> rely on manual monitoring or something built
> > externally).
> > > > > > However,
> > > > > > > > > I'm open
> > > > > > > > > >>>> to the idea if the usage changes and maybe there could
> > be
> > > > > > > > > improvements to
> > > > > > > > > >>>> the Flink metric API to achieve more seamless
> > integration.
> > > > And
> > > > > > > > > finally,
> > > > > > > > > >>>> yes, the semantics are as such and gives reason to my
> > > > response
> > > > > > for
> > > > > > > > > question
> > > > > > > > > >>>> 2.
> > > > > > > > > >>>>
> > > > > > > > > >>>> I've updated the doc with more context from my
> question
> > > > > > responses,
> > > > > > > > let
> > > > > > > > > >>>> me know if there are more questions!
> > > > > > > > > >>>>
> > > > > > > > > >>>> Best,
> > > > > > > > > >>>> Mason
> > > > > > > > > >>>>
> > > > > > > > > >>>>
> > > > > > > > > >>>>
> > > > > > > > > >>>>
> > > > > > > > > >>>>
> > > > > > > > > >>>> On Wed, Aug 17, 2022 at 8:40 AM Maximilian Michels <
> > > > > > > m...@apache.org>
> > > > > > > > > >>>> wrote:
> > > > > > > > > >>>>
> > > > > > > > > >>>>> Hey Mason,
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> I just had a look at the FLIP. If I understand
> > correctly,
> > > > you
> > > > > > are
> > > > > > > > > >>>>> proposing a very sophisticated way to read from
> > multiple
> > > > > Kafka
> > > > > > > > > >>>>> clusters
> > > > > > > > > >>>>> / topics.
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> I'm wondering whether we can share some of the code
> of
> > > the
> > > > > > > existing
> > > > > > > > > >>>>> KafkaSource. I suppose you don't want to modify
> > > KafkaSource
> > > > > > > itself
> > > > > > > > to
> > > > > > > > > >>>>> avoid any breakage. But it would be good not to
> > duplicate
> > > > too
> > > > > > > much
> > > > > > > > > >>>>> code,
> > > > > > > > > >>>>>   such that functionality that can be shared between
> > the
> > > > two
> > > > > > > > > >>>>> implementations (e.g. the reader implementation).
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> Some questions that I had when browsing the current
> > > > version:
> > > > > > > > > >>>>>
> > > > > > > > > >>>>>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> 1. Why does KafkaMetadataService have the
> > > `describeStream`
> > > > > > method
> > > > > > > > in
> > > > > > > > > >>>>> addition to `getAllStreams`? This may be redundant.
> Is
> > > the
> > > > > idea
> > > > > > > to
> > > > > > > > > get
> > > > > > > > > >>>>> the updated metadata for a particular StreamId?
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> 2. KafkaMetaDataService#isClusterActive serves the
> > > purpose
> > > > to
> > > > > > > check
> > > > > > > > > >>>>> for
> > > > > > > > > >>>>> activeness, couldn't this be included in the metadata
> > of
> > > > > > > > KafkaStream?
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> 3. Shouldn't MetadataUpdateEvent contain a full list
> of
> > > > > > > KafkaStream
> > > > > > > > > >>>>> instead of `Map<KafkaClusterIdentifier,
> Set<String>>`?
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> 4. "In addition, restarting enumerators involve
> > clearing
> > > > > > outdated
> > > > > > > > > >>>>> metrics" What metrics are we talking about here?
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> 5. `StoppableKafkaEnumContextProxy` doesn't ring with
> > me.
> > > > How
> > > > > > > about
> > > > > > > > > >>>>> `MultiKafkaSplitEnumeratorContext`?
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> 6. What about the ConfigMap implementation? Are there
> > any
> > > > > > > > limitations
> > > > > > > > > >>>>> on
> > > > > > > > > >>>>> the type of configuration changes that we want to
> > allow?
> > > > For
> > > > > > > > example,
> > > > > > > > > >>>>> is
> > > > > > > > > >>>>> it allowed to remove a cluster before it has been
> > > drained /
> > > > > > > > > >>>>> deactivated?
> > > > > > > > > >>>>> Is "not active" semantically identical to having the
> > > > cluster
> > > > > /
> > > > > > > > stream
> > > > > > > > > >>>>> removed?
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> This is an exciting new addition!
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> Cheers,
> > > > > > > > > >>>>> Max
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> On 11.08.22 10:10, Mason Chen wrote:
> > > > > > > > > >>>>> > 5. At startup, GetMetadataUpdateEvent is also used
> to
> > > > allow
> > > > > > the
> > > > > > > > > >>>>> > MultiClusterKafkaSourceReader to get the latest
> > > metadata
> > > > > from
> > > > > > > the
> > > > > > > > > >>>>> > enumerator to filter out invalid splits This is how
> > the
> > > > > > reader
> > > > > > > > can
> > > > > > > > > >>>>> solve
> > > > > > > > > >>>>> > "removing" splits/topics in the startup case.
> > > > > > > > > >>>>> >
> > > > > > > > > >>>>> > Sorry for the late response, really appreciate you
> > > > taking a
> > > > > > > look
> > > > > > > > at
> > > > > > > > > >>>>> the
> > > > > > > > > >>>>> > FLIP!
> > > > > > > > > >>>>> >
> > > > > > > > > >>>>> > Best,
> > > > > > > > > >>>>> > Mason
> > > > > > > > > >>>>> >
> > > > > > > > > >>>>> > On Thu, Aug 11, 2022 at 1:03 AM Mason Chen <
> > > > > > > > mas.chen6...@gmail.com
> > > > > > > > > >
> > > > > > > > > >>>>> wrote:
> > > > > > > > > >>>>> >
> > > > > > > > > >>>>> >> Hi Qingsheng,
> > > > > > > > > >>>>> >>
> > > > > > > > > >>>>> >> Thanks for the feedback--these are great points to
> > > > raise.
> > > > > > > > > >>>>> >>
> > > > > > > > > >>>>> >> 1. This is something I missed that is now added.
> > More
> > > > > > > generally,
> > > > > > > > > it
> > > > > > > > > >>>>> can
> > > > > > > > > >>>>> >> locate multiple topics in multiple clusters (1
> topic
> > > on
> > > > 1
> > > > > > > > cluster
> > > > > > > > > >>>>> is the
> > > > > > > > > >>>>> >> simplest case).
> > > > > > > > > >>>>> >>
> > > > > > > > > >>>>> >> 2. The KafkaMetadataService doesn't interact with
> > the
> > > > > > > > > >>>>> KafkaAdminClients.
> > > > > > > > > >>>>> >> This source merely composes the functionality of
> the
> > > > > > > KafkaSource
> > > > > > > > > so
> > > > > > > > > >>>>> >> KafkaAdminClient interaction is handled by the
> > > > > > > KafkaSubscriber.
> > > > > > > > > >>>>> >>
> > > > > > > > > >>>>> >> 3. There are no requirements for the two
> > > > > > clusters--KafkaStream
> > > > > > > > > >>>>> should
> > > > > > > > > >>>>> >> clarify this question. For example, you could move
> > > from
> > > > > > > > topicName1
> > > > > > > > > >>>>> in
> > > > > > > > > >>>>> >> cluster 1 with 11 partitions to topicName2 in
> > cluster
> > > 2
> > > > > with
> > > > > > > 22
> > > > > > > > > >>>>> >> partitions--only the KafkaStream id needs to
> remain
> > > the
> > > > > > same.
> > > > > > > If
> > > > > > > > > >>>>> there are
> > > > > > > > > >>>>> >> no offsets in checkpoint, the offsets are handled
> by
> > > the
> > > > > > > offsets
> > > > > > > > > >>>>> >> initializer from KafkaSource and currently the
> > design
> > > > only
> > > > > > > > exposes
> > > > > > > > > >>>>> 1 option
> > > > > > > > > >>>>> >> for all Kafka clusters, although this could be a
> > > > valuable
> > > > > > > > > extension.
> > > > > > > > > >>>>> >>
> > > > > > > > > >>>>> >> 4. Regarding topic and cluster removal, metadata
> is
> > > > > > checkpoint
> > > > > > > > in
> > > > > > > > > >>>>> state
> > > > > > > > > >>>>> >> via the splits. Exactly once can be maintained
> with
> > > the
> > > > > > > > assumption
> > > > > > > > > >>>>> that
> > > > > > > > > >>>>> >> required data from the dead cluster lives in the
> > live
> > > > > > cluster.
> > > > > > > > > This
> > > > > > > > > >>>>> can be
> > > > > > > > > >>>>> >> solved by not destroying the old Kafka cluster
> until
> > > > > > consumers
> > > > > > > > are
> > > > > > > > > >>>>> already
> > > > > > > > > >>>>> >> drained. In switchover, the consumer would consume
> > > from
> > > > > both
> > > > > > > old
> > > > > > > > > >>>>> and new
> > > > > > > > > >>>>> >> clusters. And finally, the metadata can be changed
> > to
> > > > > point
> > > > > > > only
> > > > > > > > > to
> > > > > > > > > >>>>> the new
> > > > > > > > > >>>>> >> cluster when consumers are drained. With the
> regular
> > > > > > > > KafkaSource,
> > > > > > > > > >>>>> if Kafka
> > > > > > > > > >>>>> >> deletes topic or a cluster is destroyed, the
> exactly
> > > > once
> > > > > > > > > semantics
> > > > > > > > > >>>>> are not
> > > > > > > > > >>>>> >> preserved and the semantic is tightly coupled with
> > > > > storage.
> > > > > > > The
> > > > > > > > > >>>>> design
> > > > > > > > > >>>>> >> composes and delegates the responsibilities to
> > > > KafkaSource
> > > > > > > > > >>>>> components so it
> > > > > > > > > >>>>> >> is limited to whatever KafkaSource can do for
> > exactly
> > > > once
> > > > > > > > > >>>>> semantics.
> > > > > > > > > >>>>> >>
> > > > > > > > > >>>>> >> 5. Yes, I added more in the FLIP.
> > > GetMetadataUpdateEvent
> > > > > was
> > > > > > > > added
> > > > > > > > > >>>>> to make
> > > > > > > > > >>>>> >> the order of steps in reader restart during split
> > > > > assignment
> > > > > > > > > >>>>> deterministic.
> > > > > > > > > >>>>> >> StoppableKafkaEnumContextProxy are used by the
> > > > underlying
> > > > > > > > > >>>>> >> KafkaSourceEnumerator to assign splits and do
> topic
> > > > > periodic
> > > > > > > > > >>>>> partition
> > > > > > > > > >>>>> >> discovery. So, these scheduled thread pools need
> to
> > be
> > > > > > cleaned
> > > > > > > > up
> > > > > > > > > >>>>> properly
> > > > > > > > > >>>>> >> and splits need to be wrapped with cluster
> > > information.
> > > > > > These
> > > > > > > > > >>>>> details are
> > > > > > > > > >>>>> >> added to the FLIP.
> > > > > > > > > >>>>> >>
> > > > > > > > > >>>>> >> Best,
> > > > > > > > > >>>>> >> Mason
> > > > > > > > > >>>>> >>
> > > > > > > > > >>>>> >> On Fri, Jul 29, 2022 at 1:38 AM Qingsheng Ren <
> > > > > > > > renqs...@gmail.com
> > > > > > > > > >
> > > > > > > > > >>>>> wrote:
> > > > > > > > > >>>>> >>
> > > > > > > > > >>>>> >>> Hi Mason,
> > > > > > > > > >>>>> >>>
> > > > > > > > > >>>>> >>> Thank you for starting this FLIP!
> > > > > > > > > >>>>> >>>
> > > > > > > > > >>>>> >>>  From my first glance this FLIP looks like a
> > > collection
> > > > > of
> > > > > > > many
> > > > > > > > > new
> > > > > > > > > >>>>> >>> interfaces, but I can’t stitch them together.
> It’ll
> > > be
> > > > > > great
> > > > > > > to
> > > > > > > > > >>>>> have some
> > > > > > > > > >>>>> >>> brief descriptions about how the source works
> > > > internally.
> > > > > > > Here
> > > > > > > > > are
> > > > > > > > > >>>>> some
> > > > > > > > > >>>>> >>> questions in my mind and please correct me if I
> > > > > > misunderstand
> > > > > > > > > your
> > > > > > > > > >>>>> design.
> > > > > > > > > >>>>> >>>
> > > > > > > > > >>>>> >>> 1. I can’t find the definition (in code) of
> > > > KafkaStream.
> > > > > > As a
> > > > > > > > > part
> > > > > > > > > >>>>> of the
> > > > > > > > > >>>>> >>> public interface KafkaMetadataService it has to
> be
> > > > public
> > > > > > > too.
> > > > > > > > > If I
> > > > > > > > > >>>>> >>> understand correctly it locates a topic on a
> > specific
> > > > > > > cluster.
> > > > > > > > > >>>>> >>>
> > > > > > > > > >>>>> >>> 2. I think there should be a default
> > implementation /
> > > > > > example
> > > > > > > > for
> > > > > > > > > >>>>> >>> KafkaMetadataService for out-of-box usage, for
> > > example
> > > > a
> > > > > > > > wrapper
> > > > > > > > > of
> > > > > > > > > >>>>> >>> multiple Kafka AdminClients that watching
> clusters
> > > > > > > > periodically.
> > > > > > > > > >>>>> >>>
> > > > > > > > > >>>>> >>> 3. It looks like the source has the ability to
> > handle
> > > > > Kafka
> > > > > > > > > cluster
> > > > > > > > > >>>>> >>> failures, like switching connections to another
> > > cluster
> > > > > > > without
> > > > > > > > > >>>>> restarting
> > > > > > > > > >>>>> >>> the Flink job. Is there any requirement for the
> two
> > > > > > clusters?
> > > > > > > > For
> > > > > > > > > >>>>> example
> > > > > > > > > >>>>> >>> they have to be identical in topic names, number
> of
> > > > > > > partitions
> > > > > > > > > and
> > > > > > > > > >>>>> offsets
> > > > > > > > > >>>>> >>> etc.
> > > > > > > > > >>>>> >>>
> > > > > > > > > >>>>> >>> 4. Regarding topic and cluster removal, how to
> > handle
> > > > and
> > > > > > > > recover
> > > > > > > > > >>>>> from
> > > > > > > > > >>>>> >>> checkpoint? Let’s say a topic is removed or
> > migrated
> > > to
> > > > > > > another
> > > > > > > > > >>>>> cluster
> > > > > > > > > >>>>> >>> after a successful checkpoint. If the job tries
> to
> > > roll
> > > > > > back
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > >>>>> >>> checkpoint which still contains the deleted topic
> > or
> > > > info
> > > > > > of
> > > > > > > a
> > > > > > > > > dead
> > > > > > > > > >>>>> >>> cluster, then how to keep the exactly-once
> semantic
> > > > under
> > > > > > > this
> > > > > > > > > >>>>> case?
> > > > > > > > > >>>>> >>>
> > > > > > > > > >>>>> >>> 5. I don’t quite get the design of
> > > > > > > > StoppableKafkaEnumContextProxy
> > > > > > > > > >>>>> and the
> > > > > > > > > >>>>> >>> GetMeradataUpdateEvent. Could you elaborate more
> in
> > > the
> > > > > > FLIP?
> > > > > > > > > >>>>> >>>
> > > > > > > > > >>>>> >>> In a nutshell I think the idea of this FLIP is
> > good,
> > > > > which
> > > > > > > > > extends
> > > > > > > > > >>>>> the
> > > > > > > > > >>>>> >>> usage of Kafka source. However as a design doc,
> > some
> > > > > > details
> > > > > > > > need
> > > > > > > > > >>>>> to be
> > > > > > > > > >>>>> >>> enriched for other users and developers to better
> > > > > > understand
> > > > > > > > how
> > > > > > > > > >>>>> this
> > > > > > > > > >>>>> >>> source works.
> > > > > > > > > >>>>> >>>
> > > > > > > > > >>>>> >>> Best,
> > > > > > > > > >>>>> >>> Qingsheng
> > > > > > > > > >>>>> >>>
> > > > > > > > > >>>>> >>>> On Jul 21, 2022, at 01:35, Mason Chen <
> > > > > > > mas.chen6...@gmail.com
> > > > > > > > >
> > > > > > > > > >>>>> wrote:
> > > > > > > > > >>>>> >>>>
> > > > > > > > > >>>>> >>>> Hi all,
> > > > > > > > > >>>>> >>>>
> > > > > > > > > >>>>> >>>> We would like to start a discussion thread on
> > > > FLIP-246:
> > > > > > > Multi
> > > > > > > > > >>>>> Cluster
> > > > > > > > > >>>>> >>> Kafka
> > > > > > > > > >>>>> >>>> Source [1] where we propose to provide a source
> > > > > connector
> > > > > > > for
> > > > > > > > > >>>>> >>> dynamically
> > > > > > > > > >>>>> >>>> reading from Kafka multiple clusters, which will
> > not
> > > > > > require
> > > > > > > > > >>>>> Flink job
> > > > > > > > > >>>>> >>>> restart. This can greatly improve the Kafka
> > > migration
> > > > > > > > experience
> > > > > > > > > >>>>> for
> > > > > > > > > >>>>> >>>> clusters and topics, and it solves some existing
> > > > > problems
> > > > > > > with
> > > > > > > > > the
> > > > > > > > > >>>>> >>> current
> > > > > > > > > >>>>> >>>> KafkaSource. There was some interest from users
> > [2]
> > > > > from a
> > > > > > > > > meetup
> > > > > > > > > >>>>> and
> > > > > > > > > >>>>> >>> the
> > > > > > > > > >>>>> >>>> mailing list. Looking forward to comments and
> > > > feedback,
> > > > > > > > thanks!
> > > > > > > > > >>>>> >>>>
> > > > > > > > > >>>>> >>>> [1]
> > > > > > > > > >>>>> >>>>
> > > > > > > > > >>>>> >>>
> > > > > > > > > >>>>>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source
> > > > > > > > > >>>>> >>>> [2]
> > > > > > > > > >>>>>
> > > > > > https://lists.apache.org/thread/zmpnzx6jjsqc0oldvdm5y2n674xzc3jc
> > > > > > > > > >>>>> >>>>
> > > > > > > > > >>>>> >>>> Best,
> > > > > > > > > >>>>> >>>> Mason
> > > > > > > > > >>>>> >>>
> > > > > > > > > >>>>> >>>
> > > > > > > > > >>>>> >
> > > > > > > > > >>>>>
> > > > > > > > > >>>>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to