Thanks Omnia, makes sense to me. > Customers who have their customised ReplicationPolicy will need to add the definition of their internal topics naming convention
I wonder should we include default impls in the interface to avoid that requirement? Ryanne On Sun, Apr 25, 2021, 2:20 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote: > Hi Mickael and Ryanne, > I updated the KIP to add these methods to the ReplicationPolicy instead of > an extra interface to simplify the changes. Please have a look and let me > know your thoughts. > > Thanks > > On Tue, Mar 30, 2021 at 7:19 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> > wrote: > >> *(sorry forgot to Replay to All) * >> Hi Ryanne, >> It's a valid concern, I was trying to separate the concerns of internal >> and replicated policy away from each other and to make the code readable as >> extending ReplicationPolicy to manage both internal and replicated topic is >> a bit odd. Am not against simplifying things out to make ReplicationPolicy >> handling both at the end of the day if an MM2 user has a special naming >> convention for topics it will be affecting both replicated and MM2 internal >> topics. >> >> For simplifying things we can extend `ReplicationPolicy` to the following >> instead of adding an extra class >> >>> *public interface ReplicationPolicy {* >>> String topicSource(String topic); >>> String upstreamTopic(String topic); >>> >>> >>> */** Returns heartbeats topic name.*/ String heartbeatsTopic();* >>> >>> >>> >>> >>> >>> * /** Returns the offset-syncs topic for given cluster alias. */ >>> String offsetSyncTopic(String targetAlias); /** Returns the name >>> checkpoint topic for given cluster alias. */ String >>> checkpointTopic(String sourceAlias); * >>> >>> default String originalTopic(String topic) { >>> String upstream = upstreamTopic(topic); >>> if (upstream == null) { >>> return topic; >>> } else { >>> return originalTopic(upstream); >>> } >>> } >>> >>> >>> * /** Internal topics are never replicated. */ >>> isInternalTopic(String topic) *//the implementaion will be moved to >>> `DefaultReplicationPolicy` to handle both kafka topics and MM2 internal >>> topics. >>> } >>> >> >> On Fri, Mar 26, 2021 at 3:05 PM Ryanne Dolan <ryannedo...@gmail.com> >> wrote: >> >>> Omnia, have we considered just adding methods to ReplicationPolicy? I'm >>> reluctant to add a new class because, as Mickael points out, we'd need to >>> carry it around in client code. >>> >>> Ryanne >>> >>> On Fri, Feb 19, 2021 at 8:31 AM Mickael Maison <mickael.mai...@gmail.com> >>> wrote: >>> >>>> Hi Omnia, >>>> >>>> Thanks for the clarifications. >>>> >>>> - I'm still a bit uneasy with the overlap between these 2 methods as >>>> currently `ReplicationPolicy.isInternalTopic` already handles MM2 >>>> internal topics. Should we make it only handle Kafka internal topics >>>> and `isMM2InternalTopic()` only handle MM2 topics? >>>> >>>> - I'm not sure I understand what this method is used for. There are no >>>> such methods for the other 2 topics (offset-sync and heartbeat). Also >>>> what happens if there are other MM2 instances using different naming >>>> schemes in the same cluster. Do all instances have to know about the >>>> other naming schemes? What are the expected issues if they don't? >>>> >>>> - RemoteClusterUtils is a client-side utility so it does not have >>>> access to the MM2 configuration. Since this new API can affect the >>>> name of the checkpoint topic, it will need to be used client-side too >>>> so users can find the checkpoint topic name. I had to realized this >>>> was the case. >>>> >>>> Thanks >>>> >>>> On Mon, Feb 15, 2021 at 9:33 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >>>> wrote: >>>> > >>>> > Hi Mickael, did you have some time to check my answer? >>>> > >>>> > On Thu, Jan 21, 2021 at 10:10 PM Omnia Ibrahim < >>>> o.g.h.ibra...@gmail.com> wrote: >>>> >> >>>> >> Hi Mickael, >>>> >> Thanks for taking another look into the KIP, regards your questions >>>> >> >>>> >> - I believe we need both "isMM2InternalTopic" and >>>> `ReplicationPolicy.isInternalTopic` as `ReplicationPolicy.isInternalTopic` >>>> does check if a topic is Kafka internal topic, while `isMM2InternalTopic` >>>> is just focusing if a topic is MM2 internal topic or not(which is >>>> heartbeat/checkpoint/offset-sync). The fact that the default for MM2 >>>> internal topics matches "ReplicationPolicy.isInternalTopic" will not be an >>>> accurate assumption anymore once we implement this KIP. >>>> >> >>>> >> - "isCheckpointTopic" will detect all checkpoint topics for all MM2 >>>> instances this is needed for "MirrorClient.checkpointTopics" which >>>> originally check if the topic name ends with CHECKPOINTS_TOPIC_SUFFIX. So >>>> this method just to keep the same functionality that originally exists in >>>> MM2 >>>> >> >>>> >> - "checkpointTopic" is used in two places 1. At topic creation in >>>> "MirrorCheckpointConnector.createInternalTopics" which use >>>> "sourceClusterAlias() + CHECKPOINTS_TOPIC_SUFFIX" and 2. At >>>> "MirrorClient.remoteConsumerOffsets" which is called by >>>> "RemoteClusterUtils.translateOffsets" the cluster alias here referred to >>>> as "remoteCluster" where the topic name is "remoteClusterAlias + >>>> CHECKPOINTS_TOPIC_SUFFIX" (which is an argument in RemoteClusterUtils, not >>>> a config) This why I called the variable cluster instead of source and >>>> instead of using the config to figure out the cluster aliases from config >>>> as we use checkpoints to keep `RemoteClusterUtils` compatible for existing >>>> users. I see a benefit of just read the config a find out the cluster >>>> aliases but on the other side, I'm not sure why "RemoteClusterUtils" >>>> doesn't get the name of the cluster from the properties instead of an >>>> argument, so I decided to keep it just for compatibility. >>>> >> >>>> >> Hope these answer some of your concerns. >>>> >> Best >>>> >> Omnia >>>> >> >>>> >> On Thu, Jan 21, 2021 at 3:37 PM Mickael Maison < >>>> mickael.mai...@gmail.com> wrote: >>>> >>> >>>> >>> Hi Omnia, >>>> >>> >>>> >>> Thanks for the updates. Sorry for the delay but I have a few more >>>> >>> small questions about the API: >>>> >>> - Do we really need "isMM2InternalTopic()"? There's already >>>> >>> "ReplicationPolicy.isInternalTopic()". If so, we need to explain the >>>> >>> difference between these 2 methods. >>>> >>> >>>> >>> - Is "isCheckpointTopic()" expected to detect all checkpoint topics >>>> >>> (for all MM2 instances) or only the ones for this connector >>>> instance. >>>> >>> If it's the later, I wonder if we could do without the method. As >>>> this >>>> >>> interface is only called by MM2, we could first call >>>> >>> "checkpointTopic()" and check if that's equal to the topic we're >>>> >>> checking. If it's the former, we don't really know topic names other >>>> >>> MM2 instances may be using! >>>> >>> >>>> >>> - The 3 methods returning topic names have different APIs: >>>> >>> "heartbeatsTopic()" takes no arguments, "offsetSyncTopic()" takes >>>> the >>>> >>> target cluster alias and "checkpointTopic()" takes "clusterAlias" >>>> >>> (which one is it? source or target?). As the interface extends >>>> >>> Configurable, maybe we could get rid of all the arguments and use >>>> the >>>> >>> config to find the cluster aliases. WDYT? >>>> >>> >>>> >>> These are minor concerns, just making sure I fully understand how >>>> the >>>> >>> API is expected to be used. Once these are cleared, I'll be happy to >>>> >>> vote for this KIP. >>>> >>> >>>> >>> Thanks >>>> >>> >>>> >>> On Fri, Jan 8, 2021 at 12:06 PM Omnia Ibrahim < >>>> o.g.h.ibra...@gmail.com> wrote: >>>> >>> > >>>> >>> > Hi Mickael, >>>> >>> > Did you get time to review the changes to the KIP? If you okay >>>> with it could you vote for the KIP here ttps:// >>>> www.mail-archive.com/dev@kafka.apache.org/msg113575.html? >>>> >>> > Thanks >>>> >>> > >>>> >>> > On Thu, Dec 10, 2020 at 2:19 PM Omnia Ibrahim < >>>> o.g.h.ibra...@gmail.com> wrote: >>>> >>> >> >>>> >>> >> Hi Mickael, >>>> >>> >> 1) That's right the interface and default implementation will in >>>> mirror-connect >>>> >>> >> 2) Renaming the interface should be fine too especially if you >>>> planning to move other functionality related to the creation there, I can >>>> edit this >>>> >>> >> >>>> >>> >> if you are okay with that please vote for the KIP here >>>> https://www.mail-archive.com/dev@kafka.apache.org/msg113575.html >>>> >>> >> >>>> >>> >> >>>> >>> >> Thanks >>>> >>> >> Omnia >>>> >>> >> On Thu, Dec 10, 2020 at 12:58 PM Mickael Maison < >>>> mickael.mai...@gmail.com> wrote: >>>> >>> >>> >>>> >>> >>> Hi Omnia, >>>> >>> >>> >>>> >>> >>> Thank you for the reply, it makes sense. >>>> >>> >>> >>>> >>> >>> A couple more comments: >>>> >>> >>> >>>> >>> >>> 1) I'm assuming the new interface and default implementation >>>> will be >>>> >>> >>> in the mirror-client project? as the names of some of these >>>> topics are >>>> >>> >>> needed by RemoteClusterUtils on the client-side. >>>> >>> >>> >>>> >>> >>> 2) I'm about to open a KIP to specify where the offset-syncs >>>> topic is >>>> >>> >>> created by MM2. In restricted environments, we'd prefer MM2 to >>>> only >>>> >>> >>> have read access to the source cluster and have the >>>> offset-syncs on >>>> >>> >>> the target cluster. I think allowing to specify the cluster >>>> where to >>>> >>> >>> create that topic would be a natural extension of the interface >>>> you >>>> >>> >>> propose here. >>>> >>> >>> >>>> >>> >>> So I wonder if your interface could be named >>>> InternalTopicsPolicy? >>>> >>> >>> That's a bit more generic than InternalTopicNamingPolicy. That >>>> would >>>> >>> >>> also match the configuration setting, >>>> internal.topics.policy.class, >>>> >>> >>> you're proposing. >>>> >>> >>> >>>> >>> >>> Thanks >>>> >>> >>> >>>> >>> >>> On Thu, Dec 3, 2020 at 10:15 PM Omnia Ibrahim < >>>> o.g.h.ibra...@gmail.com> wrote: >>>> >>> >>> > >>>> >>> >>> > Hi Mickael, >>>> >>> >>> > Thanks for your feedback! >>>> >>> >>> > Regards your question about having more configurations, I >>>> considered adding >>>> >>> >>> > configuration per each topic however this meant adding more >>>> configurations >>>> >>> >>> > for MM2 which already have so many, also the more complicated >>>> and advanced >>>> >>> >>> > replication pattern you have between clusters the more >>>> configuration lines >>>> >>> >>> > will be added to your MM2 config which isn't going to be >>>> pretty if you >>>> >>> >>> > don't have the same topics names across your clusters. >>>> >>> >>> > >>>> >>> >>> > Also, it added more complexity to the implementation as MM2 >>>> need to >>>> >>> >>> > 1- identify if a topic is checkpoints so we could list the >>>> checkpoints >>>> >>> >>> > topics in MirrorMaker 2 utils as one cluster could have X >>>> numbers >>>> >>> >>> > checkpoints topics if it's connected to X clusters, this is >>>> done right now >>>> >>> >>> > by listing any topic with suffix `.checkpoints.internal`. >>>> This could be >>>> >>> >>> > done by add `checkpoints.topic.suffix` config but this would >>>> make an >>>> >>> >>> > assumption that checkpoints will always have a suffix also >>>> having a suffix >>>> >>> >>> > means that we may need a separator as well to concatenate >>>> this suffix with >>>> >>> >>> > a prefix to identify source cluster name. >>>> >>> >>> > 2- identify if a topic is internal, so it shouldn't be >>>> replicated or track >>>> >>> >>> > checkpoints for it, right now this is relaying on disallow >>>> topics with >>>> >>> >>> > `.internal` suffix to be not replicated and not tracked in >>>> checkpoints but >>>> >>> >>> > with making topics configurable we need a way to define what >>>> is an internal >>>> >>> >>> > topic. This could be done by making using a list of all >>>> internal topics >>>> >>> >>> > have been entered to the configuration. >>>> >>> >>> > >>>> >>> >>> > So having an interface seemed easier and also give more >>>> flexibility for >>>> >>> >>> > users to define their own topics name, define what is >>>> internal topic means, >>>> >>> >>> > how to find checkpoints topics and it will be one line config >>>> for each >>>> >>> >>> > herder, also it more consistence with MM2 code as MM2 config >>>> have >>>> >>> >>> > TopicFilter, ReplicationPolicy, GroupFilter, etc as interface >>>> and they can >>>> >>> >>> > be overridden by providing a custom implementation for them >>>> or have some >>>> >>> >>> > config that change their default implementations. >>>> >>> >>> > >>>> >>> >>> > Hope this answer your question. I also updated the KIP to add >>>> this to the >>>> >>> >>> > rejected solutions. >>>> >>> >>> > >>>> >>> >>> > >>>> >>> >>> > On Thu, Dec 3, 2020 at 3:19 PM Mickael Maison < >>>> mickael.mai...@gmail.com> >>>> >>> >>> > wrote: >>>> >>> >>> > >>>> >>> >>> > > Hi Omnia, >>>> >>> >>> > > >>>> >>> >>> > > Thanks for the KIP. Indeed being able to configure MM2's >>>> internal >>>> >>> >>> > > topic names would be a nice improvement. >>>> >>> >>> > > >>>> >>> >>> > > Looking at the KIP, I was surprised you propose an >>>> interface to allow >>>> >>> >>> > > users to specify names. Have you considered making names >>>> changeable >>>> >>> >>> > > via configurations? If so, we should definitely mention it >>>> in the >>>> >>> >>> > > rejected alternatives as it's the first method that comes >>>> to mind. >>>> >>> >>> > > >>>> >>> >>> > > I understand an interface gives a lot of flexibility but >>>> I'd expect >>>> >>> >>> > > topic names to be relatively simple and known in advance in >>>> most >>>> >>> >>> > > cases. >>>> >>> >>> > > >>>> >>> >>> > > I've not checked all use cases but something like below >>>> felt appropriate: >>>> >>> >>> > > clusters = primary,backup >>>> >>> >>> > > >>>> primary->backup.offsets-sync.topic=backup.mytopic-offsets-sync >>>> >>> >>> > > >>>> >>> >>> > > On Tue, Dec 1, 2020 at 3:36 PM Omnia Ibrahim < >>>> o.g.h.ibra...@gmail.com> >>>> >>> >>> > > wrote: >>>> >>> >>> > > > >>>> >>> >>> > > > Hey everyone, >>>> >>> >>> > > > Please take a look at KIP-690: >>>> >>> >>> > > > >>>> >>> >>> > > > >>>> >>> >>> > > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-690%3A+Add+additional+configuration+to+control+MirrorMaker+2+internal+topics+naming+convention >>>> >>> >>> > > > >>>> >>> >>> > > > Thanks for your feedback and support. >>>> >>> >>> > > > >>>> >>> >>> > > > Omnia >>>> >>> >>> > > > >>>> >>> >>> > > >>>> >>>