Hi Mason,

Thanks for the clarification! In regards to the addition to the
OffsetInitializer of this API - this would be an awesome addition and I
think this entire FLIP would be a great addition to the Flink.

To provide more context as to why we need particular offsets, we use Hybrid
Source to currently backfill from buckets prior to reading from Kafka. We
have a service that will tell us what offset has last been loaded into said
bucket which we will use to initialize the KafkaSource OffsetsInitializer.
We couldn't use a timestamp here and the offset would be different for each
Cluster.

In pseudocode, we'd want the ability to do something like this with
HybridSources - if this is possible.

```scala
val offsetsMetadata: Map[TopicPartition, Long] = // Get current offsets
from OffsetReaderService
val multiClusterArchiveSource: MultiBucketFileSource[T] = // Data is read
from different buckets (multiple topics)
val multiClusterKafkaSource: MultiClusterKafkaSource[T] =
MultiClusterKafkaSource.builder()
  .setKafkaMetadataService(new KafkaMetadataServiceImpl())
  .setStreamIds(List.of("my-stream-1", "my-stream-2"))
  .setGroupId("myConsumerGroup")

.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
  .setStartingOffsets(offsetsMetadata)
  .setProperties(properties)
  .build()
val source =
HybridSource.builder(multiClusterArchiveSource).addSource(multiClusterKafkaSource).build()
```

Few notes:
- TopicPartition won't work because the topic may be the same name as this
is something that is supported IIRC
- I chose to pass a map into starting offsets just for demonstrative
purposes, I would be fine with whatever data structure would work best

Ryan van Huuksloot
Data Developer | Production Engineering | Streaming Capabilities
[image: Shopify]
<https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>


On Mon, Oct 3, 2022 at 11:29 PM Mason Chen <mas.chen6...@gmail.com> wrote:

> Hi Ryan,
>
> Just copying your message over to the email chain.
>
> Hi Mason,
>> First off, thanks for putting this FLIP together! Sorry for the delay.
>> Full disclosure Mason and I chatted a little bit at Flink Forward 2022 but
>> I have tried to capture the questions I had for him then.
>> I'll start the conversation with a few questions:
>> 1. The concept of streamIds is not clear to me in the proposal and could
>> use some more information. If I understand correctly, they will be used in
>> the MetadataService to link KafkaClusters to ones you want to use? If you
>> assign stream ids using `setStreamIds`, how can you dynamically increase
>> the number of clusters you consume if the list of StreamIds is static? I am
>> basing this off of your example .setStreamIds(List.of("my-stream-1",
>> "my-stream-2")) so I could be off base with my assumption. If you don't
>> mind clearing up the intention, that would be great!
>> 2. How would offsets work if you wanted to use this
>> MultiClusterKafkaSource with a file based backfill? In the case I am
>> thinking of, you have a bucket backed archive of Kafka data per cluster.
>> and you want to pick up from the last offset in the archived system, how
>> would you set OffsetInitializers "per cluster" potentially as a function or
>> are you limited to setting an OffsetInitializer for the entire Source?
>> 3. Just to make sure - because this system will layer on top of Flink-27
>> and use KafkaSource for some aspects under the hood, the watermark
>> alignment that was introduced in FLIP-182 / Flink 1.15 would be possible
>> across multiple clusters if you assign them to the same alignment group?
>> Thanks!
>> Ryan
>
>
> 1. The stream ids are static--however, what the physical clusters and
> topics that they map to can mutate. Let's say my-stream-1 maps to cluster-1
> and topic-1. The KafkaMetadataService can return a different mapping when
> metadata is fetched the next time e.g. my-stream-1 mapping to cluster-1 and
> topic-1, and cluster-2 and topic-2. Let me add more details on how the
> KafkaMetadataService is used.
> 2. The current design limits itself to a single configured
> OffsetInitializer that is used for every underlying KafkaSource.
> 3. Yes, it is in our plan to integrate this source with watermark
> alignment in which the user can align watermarks from all clusters within
> the single. It will leverage the Kafka Source implementation to achieve
> this.
>
> With regards to 2, it's an interesting idea. I think we can extend the
> design to support a map of offset initializers to clusters, which would
> solve your file based backfill. If you initialize the source with a single
> timestamp, the current design may work for your usecase, but I can't tell
> without more details. Thanks for your interest and sorry for the delay!
>
> Best,
> Mason
>
> On Mon, Aug 29, 2022 at 10:02 AM Mason Chen <mas.chen6...@gmail.com>
> wrote:
>
>> Hi Max,
>>
>> Thanks for taking a look!
>>
>>
>>> I'm wondering whether we can share some of the code of the existing
>>> KafkaSource.
>>>
>>
>> That is the intention--let me call it out more explicitly.
>>
>> Regarding your questions:
>>
>> 1. Indeed, the KafkaMetadataService has the describe stream method to get
>> a particular stream id. We decided to support getting all the streams for
>> subscribing via a regex pattern (similar to the Kafka Source
>> implementation).
>>
>> 2. The idea was that if metadata is removed that it is no longer active.
>>
>> 3. The MetadataUpdateEvent's format is specifically for communicating to
>> the reader what the clusters and topics it should read from. It doesn't
>> need stream information since it doesn't interact with the
>> KafkaMetadataService (only the enumerator interacts with it).
>>
>> 4. Metrics will be reported per cluster. For example, KafkaSource already
>> reports pendingRecords and the corresponding metric, for example for
>> cluster0, would be a metric called
>> `MultiClusterKafkaSource.kafkaCluster.cluster0.pendingRecords`. In cluster
>> removal, these metrics wouldn't be valid so the implementation can close
>> them.
>>
>> 5. I'm fine with that name; however, I got some feedback internally since
>> the bulk of the logic is in stopping the scheduled tasks of the underlying
>> enumerators and handling cluster unavailability edge cases. I'm open to
>> changing the name if the design changes (it is an internal class anyways,
>> so we can make these name changes without breaking users).
>>
>> 6. Yes, there are some limitations but I have not considered implementing
>> that in the basic ConfigMap implementation--currently users are allowed to
>> do any changes. For example, a user should not delete and recreate a
>> topic, on the same cluster. Regarding the logic to properly remove a
>> cluster, a user could certainly support it with a custom
>> KafkaMetadataService--I intended to keep the ConfigMap implementation basic
>> for simple use cases (so users here would rely on manual monitoring or
>> something built externally). However, I'm open to the idea if the usage
>> changes and maybe there could be improvements to the Flink metric API to
>> achieve more seamless integration. And finally, yes, the semantics are as
>> such and gives reason to my response for question 2.
>>
>> I've updated the doc with more context from my question responses, let me
>> know if there are more questions!
>>
>> Best,
>> Mason
>>
>>
>>
>>
>>
>> On Wed, Aug 17, 2022 at 8:40 AM Maximilian Michels <m...@apache.org>
>> wrote:
>>
>>> Hey Mason,
>>>
>>> I just had a look at the FLIP. If I understand correctly, you are
>>> proposing a very sophisticated way to read from multiple Kafka clusters
>>> / topics.
>>>
>>> I'm wondering whether we can share some of the code of the existing
>>> KafkaSource. I suppose you don't want to modify KafkaSource itself to
>>> avoid any breakage. But it would be good not to duplicate too much code,
>>>   such that functionality that can be shared between the two
>>> implementations (e.g. the reader implementation).
>>>
>>> Some questions that I had when browsing the current version:
>>>
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320
>>>
>>> 1. Why does KafkaMetadataService have the `describeStream` method in
>>> addition to `getAllStreams`? This may be redundant. Is the idea to get
>>> the updated metadata for a particular StreamId?
>>>
>>> 2. KafkaMetaDataService#isClusterActive serves the purpose to check for
>>> activeness, couldn't this be included in the metadata of KafkaStream?
>>>
>>> 3. Shouldn't MetadataUpdateEvent contain a full list of KafkaStream
>>> instead of `Map<KafkaClusterIdentifier, Set<String>>`?
>>>
>>> 4. "In addition, restarting enumerators involve clearing outdated
>>> metrics" What metrics are we talking about here?
>>>
>>> 5. `StoppableKafkaEnumContextProxy` doesn't ring with me. How about
>>> `MultiKafkaSplitEnumeratorContext`?
>>>
>>> 6. What about the ConfigMap implementation? Are there any limitations on
>>> the type of configuration changes that we want to allow? For example, is
>>> it allowed to remove a cluster before it has been drained / deactivated?
>>> Is "not active" semantically identical to having the cluster / stream
>>> removed?
>>>
>>> This is an exciting new addition!
>>>
>>> Cheers,
>>> Max
>>>
>>> On 11.08.22 10:10, Mason Chen wrote:
>>> > 5. At startup, GetMetadataUpdateEvent is also used to allow the
>>> > MultiClusterKafkaSourceReader to get the latest metadata from the
>>> > enumerator to filter out invalid splits This is how the reader can
>>> solve
>>> > "removing" splits/topics in the startup case.
>>> >
>>> > Sorry for the late response, really appreciate you taking a look at the
>>> > FLIP!
>>> >
>>> > Best,
>>> > Mason
>>> >
>>> > On Thu, Aug 11, 2022 at 1:03 AM Mason Chen <mas.chen6...@gmail.com>
>>> wrote:
>>> >
>>> >> Hi Qingsheng,
>>> >>
>>> >> Thanks for the feedback--these are great points to raise.
>>> >>
>>> >> 1. This is something I missed that is now added. More generally, it
>>> can
>>> >> locate multiple topics in multiple clusters (1 topic on 1 cluster is
>>> the
>>> >> simplest case).
>>> >>
>>> >> 2. The KafkaMetadataService doesn't interact with the
>>> KafkaAdminClients.
>>> >> This source merely composes the functionality of the KafkaSource so
>>> >> KafkaAdminClient interaction is handled by the KafkaSubscriber.
>>> >>
>>> >> 3. There are no requirements for the two clusters--KafkaStream should
>>> >> clarify this question. For example, you could move from topicName1 in
>>> >> cluster 1 with 11 partitions to topicName2 in cluster 2 with 22
>>> >> partitions--only the KafkaStream id needs to remain the same. If
>>> there are
>>> >> no offsets in checkpoint, the offsets are handled by the offsets
>>> >> initializer from KafkaSource and currently the design only exposes 1
>>> option
>>> >> for all Kafka clusters, although this could be a valuable extension.
>>> >>
>>> >> 4. Regarding topic and cluster removal, metadata is checkpoint in
>>> state
>>> >> via the splits. Exactly once can be maintained with the assumption
>>> that
>>> >> required data from the dead cluster lives in the live cluster. This
>>> can be
>>> >> solved by not destroying the old Kafka cluster until consumers are
>>> already
>>> >> drained. In switchover, the consumer would consume from both old and
>>> new
>>> >> clusters. And finally, the metadata can be changed to point only to
>>> the new
>>> >> cluster when consumers are drained. With the regular KafkaSource, if
>>> Kafka
>>> >> deletes topic or a cluster is destroyed, the exactly once semantics
>>> are not
>>> >> preserved and the semantic is tightly coupled with storage. The design
>>> >> composes and delegates the responsibilities to KafkaSource components
>>> so it
>>> >> is limited to whatever KafkaSource can do for exactly once semantics.
>>> >>
>>> >> 5. Yes, I added more in the FLIP. GetMetadataUpdateEvent was added to
>>> make
>>> >> the order of steps in reader restart during split assignment
>>> deterministic.
>>> >> StoppableKafkaEnumContextProxy are used by the underlying
>>> >> KafkaSourceEnumerator to assign splits and do topic periodic partition
>>> >> discovery. So, these scheduled thread pools need to be cleaned up
>>> properly
>>> >> and splits need to be wrapped with cluster information. These details
>>> are
>>> >> added to the FLIP.
>>> >>
>>> >> Best,
>>> >> Mason
>>> >>
>>> >> On Fri, Jul 29, 2022 at 1:38 AM Qingsheng Ren <renqs...@gmail.com>
>>> wrote:
>>> >>
>>> >>> Hi Mason,
>>> >>>
>>> >>> Thank you for starting this FLIP!
>>> >>>
>>> >>>  From my first glance this FLIP looks like a collection of many new
>>> >>> interfaces, but I can’t stitch them together. It’ll be great to have
>>> some
>>> >>> brief descriptions about how the source works internally. Here are
>>> some
>>> >>> questions in my mind and please correct me if I misunderstand your
>>> design.
>>> >>>
>>> >>> 1. I can’t find the definition (in code) of KafkaStream. As a part
>>> of the
>>> >>> public interface KafkaMetadataService it has to be public too. If I
>>> >>> understand correctly it locates a topic on a specific cluster.
>>> >>>
>>> >>> 2. I think there should be a default implementation / example for
>>> >>> KafkaMetadataService for out-of-box usage, for example a wrapper of
>>> >>> multiple Kafka AdminClients that watching clusters periodically.
>>> >>>
>>> >>> 3. It looks like the source has the ability to handle Kafka cluster
>>> >>> failures, like switching connections to another cluster without
>>> restarting
>>> >>> the Flink job. Is there any requirement for the two clusters? For
>>> example
>>> >>> they have to be identical in topic names, number of partitions and
>>> offsets
>>> >>> etc.
>>> >>>
>>> >>> 4. Regarding topic and cluster removal, how to handle and recover
>>> from
>>> >>> checkpoint? Let’s say a topic is removed or migrated to another
>>> cluster
>>> >>> after a successful checkpoint. If the job tries to roll back to the
>>> >>> checkpoint which still contains the deleted topic or info of a dead
>>> >>> cluster, then how to keep the exactly-once semantic under this case?
>>> >>>
>>> >>> 5. I don’t quite get the design of StoppableKafkaEnumContextProxy
>>> and the
>>> >>> GetMeradataUpdateEvent. Could you elaborate more in the FLIP?
>>> >>>
>>> >>> In a nutshell I think the idea of this FLIP is good, which extends
>>> the
>>> >>> usage of Kafka source. However as a design doc, some details need to
>>> be
>>> >>> enriched for other users and developers to better understand how this
>>> >>> source works.
>>> >>>
>>> >>> Best,
>>> >>> Qingsheng
>>> >>>
>>> >>>> On Jul 21, 2022, at 01:35, Mason Chen <mas.chen6...@gmail.com>
>>> wrote:
>>> >>>>
>>> >>>> Hi all,
>>> >>>>
>>> >>>> We would like to start a discussion thread on FLIP-246: Multi
>>> Cluster
>>> >>> Kafka
>>> >>>> Source [1] where we propose to provide a source connector for
>>> >>> dynamically
>>> >>>> reading from Kafka multiple clusters, which will not require Flink
>>> job
>>> >>>> restart. This can greatly improve the Kafka migration experience for
>>> >>>> clusters and topics, and it solves some existing problems with the
>>> >>> current
>>> >>>> KafkaSource. There was some interest from users [2] from a meetup
>>> and
>>> >>> the
>>> >>>> mailing list. Looking forward to comments and feedback, thanks!
>>> >>>>
>>> >>>> [1]
>>> >>>>
>>> >>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source
>>> >>>> [2]
>>> https://lists.apache.org/thread/zmpnzx6jjsqc0oldvdm5y2n674xzc3jc
>>> >>>>
>>> >>>> Best,
>>> >>>> Mason
>>> >>>
>>> >>>
>>> >
>>>
>>

Reply via email to