Hi Ryan, Thanks for the additional context! Yes, the offset initializer would need to take a cluster as a parameter and the MultiClusterKafkaSourceSplit can be exposed in an initializer.
Best, Mason On Thu, Oct 6, 2022 at 11:00 AM Ryan van Huuksloot < ryan.vanhuuksl...@shopify.com> wrote: > Hi Mason, > > Thanks for the clarification! In regards to the addition to the > OffsetInitializer of this API - this would be an awesome addition and I > think this entire FLIP would be a great addition to the Flink. > > To provide more context as to why we need particular offsets, we use > Hybrid Source to currently backfill from buckets prior to reading from > Kafka. We have a service that will tell us what offset has last been loaded > into said bucket which we will use to initialize the KafkaSource > OffsetsInitializer. We couldn't use a timestamp here and the offset would > be different for each Cluster. > > In pseudocode, we'd want the ability to do something like this with > HybridSources - if this is possible. > > ```scala > val offsetsMetadata: Map[TopicPartition, Long] = // Get current offsets > from OffsetReaderService > val multiClusterArchiveSource: MultiBucketFileSource[T] = // Data is read > from different buckets (multiple topics) > val multiClusterKafkaSource: MultiClusterKafkaSource[T] = > MultiClusterKafkaSource.builder() > .setKafkaMetadataService(new KafkaMetadataServiceImpl()) > .setStreamIds(List.of("my-stream-1", "my-stream-2")) > .setGroupId("myConsumerGroup") > > .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)) > .setStartingOffsets(offsetsMetadata) > .setProperties(properties) > .build() > val source = > HybridSource.builder(multiClusterArchiveSource).addSource(multiClusterKafkaSource).build() > ``` > > Few notes: > - TopicPartition won't work because the topic may be the same name as this > is something that is supported IIRC > - I chose to pass a map into starting offsets just for demonstrative > purposes, I would be fine with whatever data structure would work best > > Ryan van Huuksloot > Data Developer | Production Engineering | Streaming Capabilities > [image: Shopify] > <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email> > > > On Mon, Oct 3, 2022 at 11:29 PM Mason Chen <mas.chen6...@gmail.com> wrote: > >> Hi Ryan, >> >> Just copying your message over to the email chain. >> >> Hi Mason, >>> First off, thanks for putting this FLIP together! Sorry for the delay. >>> Full disclosure Mason and I chatted a little bit at Flink Forward 2022 but >>> I have tried to capture the questions I had for him then. >>> I'll start the conversation with a few questions: >>> 1. The concept of streamIds is not clear to me in the proposal and could >>> use some more information. If I understand correctly, they will be used in >>> the MetadataService to link KafkaClusters to ones you want to use? If you >>> assign stream ids using `setStreamIds`, how can you dynamically increase >>> the number of clusters you consume if the list of StreamIds is static? I am >>> basing this off of your example .setStreamIds(List.of("my-stream-1", >>> "my-stream-2")) so I could be off base with my assumption. If you don't >>> mind clearing up the intention, that would be great! >>> 2. How would offsets work if you wanted to use this >>> MultiClusterKafkaSource with a file based backfill? In the case I am >>> thinking of, you have a bucket backed archive of Kafka data per cluster. >>> and you want to pick up from the last offset in the archived system, how >>> would you set OffsetInitializers "per cluster" potentially as a function or >>> are you limited to setting an OffsetInitializer for the entire Source? >>> 3. Just to make sure - because this system will layer on top of Flink-27 >>> and use KafkaSource for some aspects under the hood, the watermark >>> alignment that was introduced in FLIP-182 / Flink 1.15 would be possible >>> across multiple clusters if you assign them to the same alignment group? >>> Thanks! >>> Ryan >> >> >> 1. The stream ids are static--however, what the physical clusters and >> topics that they map to can mutate. Let's say my-stream-1 maps to cluster-1 >> and topic-1. The KafkaMetadataService can return a different mapping when >> metadata is fetched the next time e.g. my-stream-1 mapping to cluster-1 and >> topic-1, and cluster-2 and topic-2. Let me add more details on how the >> KafkaMetadataService is used. >> 2. The current design limits itself to a single configured >> OffsetInitializer that is used for every underlying KafkaSource. >> 3. Yes, it is in our plan to integrate this source with watermark >> alignment in which the user can align watermarks from all clusters within >> the single. It will leverage the Kafka Source implementation to achieve >> this. >> >> With regards to 2, it's an interesting idea. I think we can extend the >> design to support a map of offset initializers to clusters, which would >> solve your file based backfill. If you initialize the source with a single >> timestamp, the current design may work for your usecase, but I can't tell >> without more details. Thanks for your interest and sorry for the delay! >> >> Best, >> Mason >> >> On Mon, Aug 29, 2022 at 10:02 AM Mason Chen <mas.chen6...@gmail.com> >> wrote: >> >>> Hi Max, >>> >>> Thanks for taking a look! >>> >>> >>>> I'm wondering whether we can share some of the code of the existing >>>> KafkaSource. >>>> >>> >>> That is the intention--let me call it out more explicitly. >>> >>> Regarding your questions: >>> >>> 1. Indeed, the KafkaMetadataService has the describe stream method to >>> get a particular stream id. We decided to support getting all the streams >>> for subscribing via a regex pattern (similar to the Kafka Source >>> implementation). >>> >>> 2. The idea was that if metadata is removed that it is no longer active. >>> >>> 3. The MetadataUpdateEvent's format is specifically for communicating to >>> the reader what the clusters and topics it should read from. It doesn't >>> need stream information since it doesn't interact with the >>> KafkaMetadataService (only the enumerator interacts with it). >>> >>> 4. Metrics will be reported per cluster. For example, KafkaSource >>> already reports pendingRecords and the corresponding metric, for example >>> for cluster0, would be a metric called >>> `MultiClusterKafkaSource.kafkaCluster.cluster0.pendingRecords`. In cluster >>> removal, these metrics wouldn't be valid so the implementation can close >>> them. >>> >>> 5. I'm fine with that name; however, I got some feedback internally >>> since the bulk of the logic is in stopping the scheduled tasks of the >>> underlying enumerators and handling cluster unavailability edge cases. I'm >>> open to changing the name if the design changes (it is an internal class >>> anyways, so we can make these name changes without breaking users). >>> >>> 6. Yes, there are some limitations but I have not >>> considered implementing that in the basic ConfigMap >>> implementation--currently users are allowed to do any changes. For example, >>> a user should not delete and recreate a topic, on the same cluster. >>> Regarding the logic to properly remove a cluster, a user could certainly >>> support it with a custom KafkaMetadataService--I intended to keep the >>> ConfigMap implementation basic for simple use cases (so users here would >>> rely on manual monitoring or something built externally). However, I'm open >>> to the idea if the usage changes and maybe there could be improvements to >>> the Flink metric API to achieve more seamless integration. And finally, >>> yes, the semantics are as such and gives reason to my response for question >>> 2. >>> >>> I've updated the doc with more context from my question responses, let >>> me know if there are more questions! >>> >>> Best, >>> Mason >>> >>> >>> >>> >>> >>> On Wed, Aug 17, 2022 at 8:40 AM Maximilian Michels <m...@apache.org> >>> wrote: >>> >>>> Hey Mason, >>>> >>>> I just had a look at the FLIP. If I understand correctly, you are >>>> proposing a very sophisticated way to read from multiple Kafka clusters >>>> / topics. >>>> >>>> I'm wondering whether we can share some of the code of the existing >>>> KafkaSource. I suppose you don't want to modify KafkaSource itself to >>>> avoid any breakage. But it would be good not to duplicate too much >>>> code, >>>> such that functionality that can be shared between the two >>>> implementations (e.g. the reader implementation). >>>> >>>> Some questions that I had when browsing the current version: >>>> >>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320 >>>> >>>> 1. Why does KafkaMetadataService have the `describeStream` method in >>>> addition to `getAllStreams`? This may be redundant. Is the idea to get >>>> the updated metadata for a particular StreamId? >>>> >>>> 2. KafkaMetaDataService#isClusterActive serves the purpose to check for >>>> activeness, couldn't this be included in the metadata of KafkaStream? >>>> >>>> 3. Shouldn't MetadataUpdateEvent contain a full list of KafkaStream >>>> instead of `Map<KafkaClusterIdentifier, Set<String>>`? >>>> >>>> 4. "In addition, restarting enumerators involve clearing outdated >>>> metrics" What metrics are we talking about here? >>>> >>>> 5. `StoppableKafkaEnumContextProxy` doesn't ring with me. How about >>>> `MultiKafkaSplitEnumeratorContext`? >>>> >>>> 6. What about the ConfigMap implementation? Are there any limitations >>>> on >>>> the type of configuration changes that we want to allow? For example, >>>> is >>>> it allowed to remove a cluster before it has been drained / >>>> deactivated? >>>> Is "not active" semantically identical to having the cluster / stream >>>> removed? >>>> >>>> This is an exciting new addition! >>>> >>>> Cheers, >>>> Max >>>> >>>> On 11.08.22 10:10, Mason Chen wrote: >>>> > 5. At startup, GetMetadataUpdateEvent is also used to allow the >>>> > MultiClusterKafkaSourceReader to get the latest metadata from the >>>> > enumerator to filter out invalid splits This is how the reader can >>>> solve >>>> > "removing" splits/topics in the startup case. >>>> > >>>> > Sorry for the late response, really appreciate you taking a look at >>>> the >>>> > FLIP! >>>> > >>>> > Best, >>>> > Mason >>>> > >>>> > On Thu, Aug 11, 2022 at 1:03 AM Mason Chen <mas.chen6...@gmail.com> >>>> wrote: >>>> > >>>> >> Hi Qingsheng, >>>> >> >>>> >> Thanks for the feedback--these are great points to raise. >>>> >> >>>> >> 1. This is something I missed that is now added. More generally, it >>>> can >>>> >> locate multiple topics in multiple clusters (1 topic on 1 cluster is >>>> the >>>> >> simplest case). >>>> >> >>>> >> 2. The KafkaMetadataService doesn't interact with the >>>> KafkaAdminClients. >>>> >> This source merely composes the functionality of the KafkaSource so >>>> >> KafkaAdminClient interaction is handled by the KafkaSubscriber. >>>> >> >>>> >> 3. There are no requirements for the two clusters--KafkaStream should >>>> >> clarify this question. For example, you could move from topicName1 in >>>> >> cluster 1 with 11 partitions to topicName2 in cluster 2 with 22 >>>> >> partitions--only the KafkaStream id needs to remain the same. If >>>> there are >>>> >> no offsets in checkpoint, the offsets are handled by the offsets >>>> >> initializer from KafkaSource and currently the design only exposes 1 >>>> option >>>> >> for all Kafka clusters, although this could be a valuable extension. >>>> >> >>>> >> 4. Regarding topic and cluster removal, metadata is checkpoint in >>>> state >>>> >> via the splits. Exactly once can be maintained with the assumption >>>> that >>>> >> required data from the dead cluster lives in the live cluster. This >>>> can be >>>> >> solved by not destroying the old Kafka cluster until consumers are >>>> already >>>> >> drained. In switchover, the consumer would consume from both old and >>>> new >>>> >> clusters. And finally, the metadata can be changed to point only to >>>> the new >>>> >> cluster when consumers are drained. With the regular KafkaSource, if >>>> Kafka >>>> >> deletes topic or a cluster is destroyed, the exactly once semantics >>>> are not >>>> >> preserved and the semantic is tightly coupled with storage. The >>>> design >>>> >> composes and delegates the responsibilities to KafkaSource >>>> components so it >>>> >> is limited to whatever KafkaSource can do for exactly once semantics. >>>> >> >>>> >> 5. Yes, I added more in the FLIP. GetMetadataUpdateEvent was added >>>> to make >>>> >> the order of steps in reader restart during split assignment >>>> deterministic. >>>> >> StoppableKafkaEnumContextProxy are used by the underlying >>>> >> KafkaSourceEnumerator to assign splits and do topic periodic >>>> partition >>>> >> discovery. So, these scheduled thread pools need to be cleaned up >>>> properly >>>> >> and splits need to be wrapped with cluster information. These >>>> details are >>>> >> added to the FLIP. >>>> >> >>>> >> Best, >>>> >> Mason >>>> >> >>>> >> On Fri, Jul 29, 2022 at 1:38 AM Qingsheng Ren <renqs...@gmail.com> >>>> wrote: >>>> >> >>>> >>> Hi Mason, >>>> >>> >>>> >>> Thank you for starting this FLIP! >>>> >>> >>>> >>> From my first glance this FLIP looks like a collection of many new >>>> >>> interfaces, but I can’t stitch them together. It’ll be great to >>>> have some >>>> >>> brief descriptions about how the source works internally. Here are >>>> some >>>> >>> questions in my mind and please correct me if I misunderstand your >>>> design. >>>> >>> >>>> >>> 1. I can’t find the definition (in code) of KafkaStream. As a part >>>> of the >>>> >>> public interface KafkaMetadataService it has to be public too. If I >>>> >>> understand correctly it locates a topic on a specific cluster. >>>> >>> >>>> >>> 2. I think there should be a default implementation / example for >>>> >>> KafkaMetadataService for out-of-box usage, for example a wrapper of >>>> >>> multiple Kafka AdminClients that watching clusters periodically. >>>> >>> >>>> >>> 3. It looks like the source has the ability to handle Kafka cluster >>>> >>> failures, like switching connections to another cluster without >>>> restarting >>>> >>> the Flink job. Is there any requirement for the two clusters? For >>>> example >>>> >>> they have to be identical in topic names, number of partitions and >>>> offsets >>>> >>> etc. >>>> >>> >>>> >>> 4. Regarding topic and cluster removal, how to handle and recover >>>> from >>>> >>> checkpoint? Let’s say a topic is removed or migrated to another >>>> cluster >>>> >>> after a successful checkpoint. If the job tries to roll back to the >>>> >>> checkpoint which still contains the deleted topic or info of a dead >>>> >>> cluster, then how to keep the exactly-once semantic under this case? >>>> >>> >>>> >>> 5. I don’t quite get the design of StoppableKafkaEnumContextProxy >>>> and the >>>> >>> GetMeradataUpdateEvent. Could you elaborate more in the FLIP? >>>> >>> >>>> >>> In a nutshell I think the idea of this FLIP is good, which extends >>>> the >>>> >>> usage of Kafka source. However as a design doc, some details need >>>> to be >>>> >>> enriched for other users and developers to better understand how >>>> this >>>> >>> source works. >>>> >>> >>>> >>> Best, >>>> >>> Qingsheng >>>> >>> >>>> >>>> On Jul 21, 2022, at 01:35, Mason Chen <mas.chen6...@gmail.com> >>>> wrote: >>>> >>>> >>>> >>>> Hi all, >>>> >>>> >>>> >>>> We would like to start a discussion thread on FLIP-246: Multi >>>> Cluster >>>> >>> Kafka >>>> >>>> Source [1] where we propose to provide a source connector for >>>> >>> dynamically >>>> >>>> reading from Kafka multiple clusters, which will not require Flink >>>> job >>>> >>>> restart. This can greatly improve the Kafka migration experience >>>> for >>>> >>>> clusters and topics, and it solves some existing problems with the >>>> >>> current >>>> >>>> KafkaSource. There was some interest from users [2] from a meetup >>>> and >>>> >>> the >>>> >>>> mailing list. Looking forward to comments and feedback, thanks! >>>> >>>> >>>> >>>> [1] >>>> >>>> >>>> >>> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source >>>> >>>> [2] >>>> https://lists.apache.org/thread/zmpnzx6jjsqc0oldvdm5y2n674xzc3jc >>>> >>>> >>>> >>>> Best, >>>> >>>> Mason >>>> >>> >>>> >>> >>>> > >>>> >>>