Hi Ryan,

Thanks for the additional context! Yes, the offset initializer would need
to take a cluster as a parameter and the MultiClusterKafkaSourceSplit can
be exposed in an initializer.

Best,
Mason

On Thu, Oct 6, 2022 at 11:00 AM Ryan van Huuksloot <
ryan.vanhuuksl...@shopify.com> wrote:

> Hi Mason,
>
> Thanks for the clarification! In regards to the addition to the
> OffsetInitializer of this API - this would be an awesome addition and I
> think this entire FLIP would be a great addition to the Flink.
>
> To provide more context as to why we need particular offsets, we use
> Hybrid Source to currently backfill from buckets prior to reading from
> Kafka. We have a service that will tell us what offset has last been loaded
> into said bucket which we will use to initialize the KafkaSource
> OffsetsInitializer. We couldn't use a timestamp here and the offset would
> be different for each Cluster.
>
> In pseudocode, we'd want the ability to do something like this with
> HybridSources - if this is possible.
>
> ```scala
> val offsetsMetadata: Map[TopicPartition, Long] = // Get current offsets
> from OffsetReaderService
> val multiClusterArchiveSource: MultiBucketFileSource[T] = // Data is read
> from different buckets (multiple topics)
> val multiClusterKafkaSource: MultiClusterKafkaSource[T] =
> MultiClusterKafkaSource.builder()
>   .setKafkaMetadataService(new KafkaMetadataServiceImpl())
>   .setStreamIds(List.of("my-stream-1", "my-stream-2"))
>   .setGroupId("myConsumerGroup")
>
> .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
>   .setStartingOffsets(offsetsMetadata)
>   .setProperties(properties)
>   .build()
> val source =
> HybridSource.builder(multiClusterArchiveSource).addSource(multiClusterKafkaSource).build()
> ```
>
> Few notes:
> - TopicPartition won't work because the topic may be the same name as this
> is something that is supported IIRC
> - I chose to pass a map into starting offsets just for demonstrative
> purposes, I would be fine with whatever data structure would work best
>
> Ryan van Huuksloot
> Data Developer | Production Engineering | Streaming Capabilities
> [image: Shopify]
> <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>
>
>
> On Mon, Oct 3, 2022 at 11:29 PM Mason Chen <mas.chen6...@gmail.com> wrote:
>
>> Hi Ryan,
>>
>> Just copying your message over to the email chain.
>>
>> Hi Mason,
>>> First off, thanks for putting this FLIP together! Sorry for the delay.
>>> Full disclosure Mason and I chatted a little bit at Flink Forward 2022 but
>>> I have tried to capture the questions I had for him then.
>>> I'll start the conversation with a few questions:
>>> 1. The concept of streamIds is not clear to me in the proposal and could
>>> use some more information. If I understand correctly, they will be used in
>>> the MetadataService to link KafkaClusters to ones you want to use? If you
>>> assign stream ids using `setStreamIds`, how can you dynamically increase
>>> the number of clusters you consume if the list of StreamIds is static? I am
>>> basing this off of your example .setStreamIds(List.of("my-stream-1",
>>> "my-stream-2")) so I could be off base with my assumption. If you don't
>>> mind clearing up the intention, that would be great!
>>> 2. How would offsets work if you wanted to use this
>>> MultiClusterKafkaSource with a file based backfill? In the case I am
>>> thinking of, you have a bucket backed archive of Kafka data per cluster.
>>> and you want to pick up from the last offset in the archived system, how
>>> would you set OffsetInitializers "per cluster" potentially as a function or
>>> are you limited to setting an OffsetInitializer for the entire Source?
>>> 3. Just to make sure - because this system will layer on top of Flink-27
>>> and use KafkaSource for some aspects under the hood, the watermark
>>> alignment that was introduced in FLIP-182 / Flink 1.15 would be possible
>>> across multiple clusters if you assign them to the same alignment group?
>>> Thanks!
>>> Ryan
>>
>>
>> 1. The stream ids are static--however, what the physical clusters and
>> topics that they map to can mutate. Let's say my-stream-1 maps to cluster-1
>> and topic-1. The KafkaMetadataService can return a different mapping when
>> metadata is fetched the next time e.g. my-stream-1 mapping to cluster-1 and
>> topic-1, and cluster-2 and topic-2. Let me add more details on how the
>> KafkaMetadataService is used.
>> 2. The current design limits itself to a single configured
>> OffsetInitializer that is used for every underlying KafkaSource.
>> 3. Yes, it is in our plan to integrate this source with watermark
>> alignment in which the user can align watermarks from all clusters within
>> the single. It will leverage the Kafka Source implementation to achieve
>> this.
>>
>> With regards to 2, it's an interesting idea. I think we can extend the
>> design to support a map of offset initializers to clusters, which would
>> solve your file based backfill. If you initialize the source with a single
>> timestamp, the current design may work for your usecase, but I can't tell
>> without more details. Thanks for your interest and sorry for the delay!
>>
>> Best,
>> Mason
>>
>> On Mon, Aug 29, 2022 at 10:02 AM Mason Chen <mas.chen6...@gmail.com>
>> wrote:
>>
>>> Hi Max,
>>>
>>> Thanks for taking a look!
>>>
>>>
>>>> I'm wondering whether we can share some of the code of the existing
>>>> KafkaSource.
>>>>
>>>
>>> That is the intention--let me call it out more explicitly.
>>>
>>> Regarding your questions:
>>>
>>> 1. Indeed, the KafkaMetadataService has the describe stream method to
>>> get a particular stream id. We decided to support getting all the streams
>>> for subscribing via a regex pattern (similar to the Kafka Source
>>> implementation).
>>>
>>> 2. The idea was that if metadata is removed that it is no longer active.
>>>
>>> 3. The MetadataUpdateEvent's format is specifically for communicating to
>>> the reader what the clusters and topics it should read from. It doesn't
>>> need stream information since it doesn't interact with the
>>> KafkaMetadataService (only the enumerator interacts with it).
>>>
>>> 4. Metrics will be reported per cluster. For example, KafkaSource
>>> already reports pendingRecords and the corresponding metric, for example
>>> for cluster0, would be a metric called
>>> `MultiClusterKafkaSource.kafkaCluster.cluster0.pendingRecords`. In cluster
>>> removal, these metrics wouldn't be valid so the implementation can close
>>> them.
>>>
>>> 5. I'm fine with that name; however, I got some feedback internally
>>> since the bulk of the logic is in stopping the scheduled tasks of the
>>> underlying enumerators and handling cluster unavailability edge cases. I'm
>>> open to changing the name if the design changes (it is an internal class
>>> anyways, so we can make these name changes without breaking users).
>>>
>>> 6. Yes, there are some limitations but I have not
>>> considered implementing that in the basic ConfigMap
>>> implementation--currently users are allowed to do any changes. For example,
>>> a user should not delete and recreate a topic, on the same cluster.
>>> Regarding the logic to properly remove a cluster, a user could certainly
>>> support it with a custom KafkaMetadataService--I intended to keep the
>>> ConfigMap implementation basic for simple use cases (so users here would
>>> rely on manual monitoring or something built externally). However, I'm open
>>> to the idea if the usage changes and maybe there could be improvements to
>>> the Flink metric API to achieve more seamless integration. And finally,
>>> yes, the semantics are as such and gives reason to my response for question
>>> 2.
>>>
>>> I've updated the doc with more context from my question responses, let
>>> me know if there are more questions!
>>>
>>> Best,
>>> Mason
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Aug 17, 2022 at 8:40 AM Maximilian Michels <m...@apache.org>
>>> wrote:
>>>
>>>> Hey Mason,
>>>>
>>>> I just had a look at the FLIP. If I understand correctly, you are
>>>> proposing a very sophisticated way to read from multiple Kafka clusters
>>>> / topics.
>>>>
>>>> I'm wondering whether we can share some of the code of the existing
>>>> KafkaSource. I suppose you don't want to modify KafkaSource itself to
>>>> avoid any breakage. But it would be good not to duplicate too much
>>>> code,
>>>>   such that functionality that can be shared between the two
>>>> implementations (e.g. the reader implementation).
>>>>
>>>> Some questions that I had when browsing the current version:
>>>>
>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320
>>>>
>>>> 1. Why does KafkaMetadataService have the `describeStream` method in
>>>> addition to `getAllStreams`? This may be redundant. Is the idea to get
>>>> the updated metadata for a particular StreamId?
>>>>
>>>> 2. KafkaMetaDataService#isClusterActive serves the purpose to check for
>>>> activeness, couldn't this be included in the metadata of KafkaStream?
>>>>
>>>> 3. Shouldn't MetadataUpdateEvent contain a full list of KafkaStream
>>>> instead of `Map<KafkaClusterIdentifier, Set<String>>`?
>>>>
>>>> 4. "In addition, restarting enumerators involve clearing outdated
>>>> metrics" What metrics are we talking about here?
>>>>
>>>> 5. `StoppableKafkaEnumContextProxy` doesn't ring with me. How about
>>>> `MultiKafkaSplitEnumeratorContext`?
>>>>
>>>> 6. What about the ConfigMap implementation? Are there any limitations
>>>> on
>>>> the type of configuration changes that we want to allow? For example,
>>>> is
>>>> it allowed to remove a cluster before it has been drained /
>>>> deactivated?
>>>> Is "not active" semantically identical to having the cluster / stream
>>>> removed?
>>>>
>>>> This is an exciting new addition!
>>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> On 11.08.22 10:10, Mason Chen wrote:
>>>> > 5. At startup, GetMetadataUpdateEvent is also used to allow the
>>>> > MultiClusterKafkaSourceReader to get the latest metadata from the
>>>> > enumerator to filter out invalid splits This is how the reader can
>>>> solve
>>>> > "removing" splits/topics in the startup case.
>>>> >
>>>> > Sorry for the late response, really appreciate you taking a look at
>>>> the
>>>> > FLIP!
>>>> >
>>>> > Best,
>>>> > Mason
>>>> >
>>>> > On Thu, Aug 11, 2022 at 1:03 AM Mason Chen <mas.chen6...@gmail.com>
>>>> wrote:
>>>> >
>>>> >> Hi Qingsheng,
>>>> >>
>>>> >> Thanks for the feedback--these are great points to raise.
>>>> >>
>>>> >> 1. This is something I missed that is now added. More generally, it
>>>> can
>>>> >> locate multiple topics in multiple clusters (1 topic on 1 cluster is
>>>> the
>>>> >> simplest case).
>>>> >>
>>>> >> 2. The KafkaMetadataService doesn't interact with the
>>>> KafkaAdminClients.
>>>> >> This source merely composes the functionality of the KafkaSource so
>>>> >> KafkaAdminClient interaction is handled by the KafkaSubscriber.
>>>> >>
>>>> >> 3. There are no requirements for the two clusters--KafkaStream should
>>>> >> clarify this question. For example, you could move from topicName1 in
>>>> >> cluster 1 with 11 partitions to topicName2 in cluster 2 with 22
>>>> >> partitions--only the KafkaStream id needs to remain the same. If
>>>> there are
>>>> >> no offsets in checkpoint, the offsets are handled by the offsets
>>>> >> initializer from KafkaSource and currently the design only exposes 1
>>>> option
>>>> >> for all Kafka clusters, although this could be a valuable extension.
>>>> >>
>>>> >> 4. Regarding topic and cluster removal, metadata is checkpoint in
>>>> state
>>>> >> via the splits. Exactly once can be maintained with the assumption
>>>> that
>>>> >> required data from the dead cluster lives in the live cluster. This
>>>> can be
>>>> >> solved by not destroying the old Kafka cluster until consumers are
>>>> already
>>>> >> drained. In switchover, the consumer would consume from both old and
>>>> new
>>>> >> clusters. And finally, the metadata can be changed to point only to
>>>> the new
>>>> >> cluster when consumers are drained. With the regular KafkaSource, if
>>>> Kafka
>>>> >> deletes topic or a cluster is destroyed, the exactly once semantics
>>>> are not
>>>> >> preserved and the semantic is tightly coupled with storage. The
>>>> design
>>>> >> composes and delegates the responsibilities to KafkaSource
>>>> components so it
>>>> >> is limited to whatever KafkaSource can do for exactly once semantics.
>>>> >>
>>>> >> 5. Yes, I added more in the FLIP. GetMetadataUpdateEvent was added
>>>> to make
>>>> >> the order of steps in reader restart during split assignment
>>>> deterministic.
>>>> >> StoppableKafkaEnumContextProxy are used by the underlying
>>>> >> KafkaSourceEnumerator to assign splits and do topic periodic
>>>> partition
>>>> >> discovery. So, these scheduled thread pools need to be cleaned up
>>>> properly
>>>> >> and splits need to be wrapped with cluster information. These
>>>> details are
>>>> >> added to the FLIP.
>>>> >>
>>>> >> Best,
>>>> >> Mason
>>>> >>
>>>> >> On Fri, Jul 29, 2022 at 1:38 AM Qingsheng Ren <renqs...@gmail.com>
>>>> wrote:
>>>> >>
>>>> >>> Hi Mason,
>>>> >>>
>>>> >>> Thank you for starting this FLIP!
>>>> >>>
>>>> >>>  From my first glance this FLIP looks like a collection of many new
>>>> >>> interfaces, but I can’t stitch them together. It’ll be great to
>>>> have some
>>>> >>> brief descriptions about how the source works internally. Here are
>>>> some
>>>> >>> questions in my mind and please correct me if I misunderstand your
>>>> design.
>>>> >>>
>>>> >>> 1. I can’t find the definition (in code) of KafkaStream. As a part
>>>> of the
>>>> >>> public interface KafkaMetadataService it has to be public too. If I
>>>> >>> understand correctly it locates a topic on a specific cluster.
>>>> >>>
>>>> >>> 2. I think there should be a default implementation / example for
>>>> >>> KafkaMetadataService for out-of-box usage, for example a wrapper of
>>>> >>> multiple Kafka AdminClients that watching clusters periodically.
>>>> >>>
>>>> >>> 3. It looks like the source has the ability to handle Kafka cluster
>>>> >>> failures, like switching connections to another cluster without
>>>> restarting
>>>> >>> the Flink job. Is there any requirement for the two clusters? For
>>>> example
>>>> >>> they have to be identical in topic names, number of partitions and
>>>> offsets
>>>> >>> etc.
>>>> >>>
>>>> >>> 4. Regarding topic and cluster removal, how to handle and recover
>>>> from
>>>> >>> checkpoint? Let’s say a topic is removed or migrated to another
>>>> cluster
>>>> >>> after a successful checkpoint. If the job tries to roll back to the
>>>> >>> checkpoint which still contains the deleted topic or info of a dead
>>>> >>> cluster, then how to keep the exactly-once semantic under this case?
>>>> >>>
>>>> >>> 5. I don’t quite get the design of StoppableKafkaEnumContextProxy
>>>> and the
>>>> >>> GetMeradataUpdateEvent. Could you elaborate more in the FLIP?
>>>> >>>
>>>> >>> In a nutshell I think the idea of this FLIP is good, which extends
>>>> the
>>>> >>> usage of Kafka source. However as a design doc, some details need
>>>> to be
>>>> >>> enriched for other users and developers to better understand how
>>>> this
>>>> >>> source works.
>>>> >>>
>>>> >>> Best,
>>>> >>> Qingsheng
>>>> >>>
>>>> >>>> On Jul 21, 2022, at 01:35, Mason Chen <mas.chen6...@gmail.com>
>>>> wrote:
>>>> >>>>
>>>> >>>> Hi all,
>>>> >>>>
>>>> >>>> We would like to start a discussion thread on FLIP-246: Multi
>>>> Cluster
>>>> >>> Kafka
>>>> >>>> Source [1] where we propose to provide a source connector for
>>>> >>> dynamically
>>>> >>>> reading from Kafka multiple clusters, which will not require Flink
>>>> job
>>>> >>>> restart. This can greatly improve the Kafka migration experience
>>>> for
>>>> >>>> clusters and topics, and it solves some existing problems with the
>>>> >>> current
>>>> >>>> KafkaSource. There was some interest from users [2] from a meetup
>>>> and
>>>> >>> the
>>>> >>>> mailing list. Looking forward to comments and feedback, thanks!
>>>> >>>>
>>>> >>>> [1]
>>>> >>>>
>>>> >>>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source
>>>> >>>> [2]
>>>> https://lists.apache.org/thread/zmpnzx6jjsqc0oldvdm5y2n674xzc3jc
>>>> >>>>
>>>> >>>> Best,
>>>> >>>> Mason
>>>> >>>
>>>> >>>
>>>> >
>>>>
>>>

Reply via email to