Sure, this would make it easier, we can make these functions returns the original behaviour (<clusterAlias>.checkpoints.internal, "mm2-offset-syncs.<clusterAlias>.internal", heartbeat) without any customisation using `replication.policy.separator` and use the separator in the DefaultReplicationPolicy
On Wed, Apr 28, 2021 at 1:31 AM Ryanne Dolan <ryannedo...@gmail.com> wrote: > Thanks Omnia, makes sense to me. > > > Customers who have their customised ReplicationPolicy will need to add > the definition of their internal topics naming convention > > I wonder should we include default impls in the interface to avoid that > requirement? > > Ryanne > > On Sun, Apr 25, 2021, 2:20 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> > wrote: > >> Hi Mickael and Ryanne, >> I updated the KIP to add these methods to the ReplicationPolicy instead >> of an extra interface to simplify the changes. Please have a look and let >> me know your thoughts. >> >> Thanks >> >> On Tue, Mar 30, 2021 at 7:19 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >> wrote: >> >>> *(sorry forgot to Replay to All) * >>> Hi Ryanne, >>> It's a valid concern, I was trying to separate the concerns of internal >>> and replicated policy away from each other and to make the code readable as >>> extending ReplicationPolicy to manage both internal and replicated topic is >>> a bit odd. Am not against simplifying things out to make ReplicationPolicy >>> handling both at the end of the day if an MM2 user has a special naming >>> convention for topics it will be affecting both replicated and MM2 internal >>> topics. >>> >>> For simplifying things we can extend `ReplicationPolicy` to the >>> following instead of adding an extra class >>> >>>> *public interface ReplicationPolicy {* >>>> String topicSource(String topic); >>>> String upstreamTopic(String topic); >>>> >>>> >>>> */** Returns heartbeats topic name.*/ String heartbeatsTopic();* >>>> >>>> >>>> >>>> >>>> >>>> * /** Returns the offset-syncs topic for given cluster alias. */ >>>> String offsetSyncTopic(String targetAlias); /** Returns the name >>>> checkpoint topic for given cluster alias. */ String >>>> checkpointTopic(String sourceAlias); * >>>> >>>> default String originalTopic(String topic) { >>>> String upstream = upstreamTopic(topic); >>>> if (upstream == null) { >>>> return topic; >>>> } else { >>>> return originalTopic(upstream); >>>> } >>>> } >>>> >>>> >>>> * /** Internal topics are never replicated. */ >>>> isInternalTopic(String topic) *//the implementaion will be moved to >>>> `DefaultReplicationPolicy` to handle both kafka topics and MM2 internal >>>> topics. >>>> } >>>> >>> >>> On Fri, Mar 26, 2021 at 3:05 PM Ryanne Dolan <ryannedo...@gmail.com> >>> wrote: >>> >>>> Omnia, have we considered just adding methods to ReplicationPolicy? I'm >>>> reluctant to add a new class because, as Mickael points out, we'd need to >>>> carry it around in client code. >>>> >>>> Ryanne >>>> >>>> On Fri, Feb 19, 2021 at 8:31 AM Mickael Maison < >>>> mickael.mai...@gmail.com> wrote: >>>> >>>>> Hi Omnia, >>>>> >>>>> Thanks for the clarifications. >>>>> >>>>> - I'm still a bit uneasy with the overlap between these 2 methods as >>>>> currently `ReplicationPolicy.isInternalTopic` already handles MM2 >>>>> internal topics. Should we make it only handle Kafka internal topics >>>>> and `isMM2InternalTopic()` only handle MM2 topics? >>>>> >>>>> - I'm not sure I understand what this method is used for. There are no >>>>> such methods for the other 2 topics (offset-sync and heartbeat). Also >>>>> what happens if there are other MM2 instances using different naming >>>>> schemes in the same cluster. Do all instances have to know about the >>>>> other naming schemes? What are the expected issues if they don't? >>>>> >>>>> - RemoteClusterUtils is a client-side utility so it does not have >>>>> access to the MM2 configuration. Since this new API can affect the >>>>> name of the checkpoint topic, it will need to be used client-side too >>>>> so users can find the checkpoint topic name. I had to realized this >>>>> was the case. >>>>> >>>>> Thanks >>>>> >>>>> On Mon, Feb 15, 2021 at 9:33 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >>>>> wrote: >>>>> > >>>>> > Hi Mickael, did you have some time to check my answer? >>>>> > >>>>> > On Thu, Jan 21, 2021 at 10:10 PM Omnia Ibrahim < >>>>> o.g.h.ibra...@gmail.com> wrote: >>>>> >> >>>>> >> Hi Mickael, >>>>> >> Thanks for taking another look into the KIP, regards your questions >>>>> >> >>>>> >> - I believe we need both "isMM2InternalTopic" and >>>>> `ReplicationPolicy.isInternalTopic` as >>>>> `ReplicationPolicy.isInternalTopic` >>>>> does check if a topic is Kafka internal topic, while `isMM2InternalTopic` >>>>> is just focusing if a topic is MM2 internal topic or not(which is >>>>> heartbeat/checkpoint/offset-sync). The fact that the default for MM2 >>>>> internal topics matches "ReplicationPolicy.isInternalTopic" will not be an >>>>> accurate assumption anymore once we implement this KIP. >>>>> >> >>>>> >> - "isCheckpointTopic" will detect all checkpoint topics for all MM2 >>>>> instances this is needed for "MirrorClient.checkpointTopics" which >>>>> originally check if the topic name ends with CHECKPOINTS_TOPIC_SUFFIX. So >>>>> this method just to keep the same functionality that originally exists in >>>>> MM2 >>>>> >> >>>>> >> - "checkpointTopic" is used in two places 1. At topic creation in >>>>> "MirrorCheckpointConnector.createInternalTopics" which use >>>>> "sourceClusterAlias() + CHECKPOINTS_TOPIC_SUFFIX" and 2. At >>>>> "MirrorClient.remoteConsumerOffsets" which is called by >>>>> "RemoteClusterUtils.translateOffsets" the cluster alias here referred to >>>>> as "remoteCluster" where the topic name is "remoteClusterAlias + >>>>> CHECKPOINTS_TOPIC_SUFFIX" (which is an argument in RemoteClusterUtils, >>>>> not >>>>> a config) This why I called the variable cluster instead of source and >>>>> instead of using the config to figure out the cluster aliases from config >>>>> as we use checkpoints to keep `RemoteClusterUtils` compatible for existing >>>>> users. I see a benefit of just read the config a find out the cluster >>>>> aliases but on the other side, I'm not sure why "RemoteClusterUtils" >>>>> doesn't get the name of the cluster from the properties instead of an >>>>> argument, so I decided to keep it just for compatibility. >>>>> >> >>>>> >> Hope these answer some of your concerns. >>>>> >> Best >>>>> >> Omnia >>>>> >> >>>>> >> On Thu, Jan 21, 2021 at 3:37 PM Mickael Maison < >>>>> mickael.mai...@gmail.com> wrote: >>>>> >>> >>>>> >>> Hi Omnia, >>>>> >>> >>>>> >>> Thanks for the updates. Sorry for the delay but I have a few more >>>>> >>> small questions about the API: >>>>> >>> - Do we really need "isMM2InternalTopic()"? There's already >>>>> >>> "ReplicationPolicy.isInternalTopic()". If so, we need to explain >>>>> the >>>>> >>> difference between these 2 methods. >>>>> >>> >>>>> >>> - Is "isCheckpointTopic()" expected to detect all checkpoint topics >>>>> >>> (for all MM2 instances) or only the ones for this connector >>>>> instance. >>>>> >>> If it's the later, I wonder if we could do without the method. As >>>>> this >>>>> >>> interface is only called by MM2, we could first call >>>>> >>> "checkpointTopic()" and check if that's equal to the topic we're >>>>> >>> checking. If it's the former, we don't really know topic names >>>>> other >>>>> >>> MM2 instances may be using! >>>>> >>> >>>>> >>> - The 3 methods returning topic names have different APIs: >>>>> >>> "heartbeatsTopic()" takes no arguments, "offsetSyncTopic()" takes >>>>> the >>>>> >>> target cluster alias and "checkpointTopic()" takes "clusterAlias" >>>>> >>> (which one is it? source or target?). As the interface extends >>>>> >>> Configurable, maybe we could get rid of all the arguments and use >>>>> the >>>>> >>> config to find the cluster aliases. WDYT? >>>>> >>> >>>>> >>> These are minor concerns, just making sure I fully understand how >>>>> the >>>>> >>> API is expected to be used. Once these are cleared, I'll be happy >>>>> to >>>>> >>> vote for this KIP. >>>>> >>> >>>>> >>> Thanks >>>>> >>> >>>>> >>> On Fri, Jan 8, 2021 at 12:06 PM Omnia Ibrahim < >>>>> o.g.h.ibra...@gmail.com> wrote: >>>>> >>> > >>>>> >>> > Hi Mickael, >>>>> >>> > Did you get time to review the changes to the KIP? If you okay >>>>> with it could you vote for the KIP here ttps:// >>>>> www.mail-archive.com/dev@kafka.apache.org/msg113575.html? >>>>> >>> > Thanks >>>>> >>> > >>>>> >>> > On Thu, Dec 10, 2020 at 2:19 PM Omnia Ibrahim < >>>>> o.g.h.ibra...@gmail.com> wrote: >>>>> >>> >> >>>>> >>> >> Hi Mickael, >>>>> >>> >> 1) That's right the interface and default implementation will >>>>> in mirror-connect >>>>> >>> >> 2) Renaming the interface should be fine too especially if you >>>>> planning to move other functionality related to the creation there, I can >>>>> edit this >>>>> >>> >> >>>>> >>> >> if you are okay with that please vote for the KIP here >>>>> https://www.mail-archive.com/dev@kafka.apache.org/msg113575.html >>>>> >>> >> >>>>> >>> >> >>>>> >>> >> Thanks >>>>> >>> >> Omnia >>>>> >>> >> On Thu, Dec 10, 2020 at 12:58 PM Mickael Maison < >>>>> mickael.mai...@gmail.com> wrote: >>>>> >>> >>> >>>>> >>> >>> Hi Omnia, >>>>> >>> >>> >>>>> >>> >>> Thank you for the reply, it makes sense. >>>>> >>> >>> >>>>> >>> >>> A couple more comments: >>>>> >>> >>> >>>>> >>> >>> 1) I'm assuming the new interface and default implementation >>>>> will be >>>>> >>> >>> in the mirror-client project? as the names of some of these >>>>> topics are >>>>> >>> >>> needed by RemoteClusterUtils on the client-side. >>>>> >>> >>> >>>>> >>> >>> 2) I'm about to open a KIP to specify where the offset-syncs >>>>> topic is >>>>> >>> >>> created by MM2. In restricted environments, we'd prefer MM2 to >>>>> only >>>>> >>> >>> have read access to the source cluster and have the >>>>> offset-syncs on >>>>> >>> >>> the target cluster. I think allowing to specify the cluster >>>>> where to >>>>> >>> >>> create that topic would be a natural extension of the >>>>> interface you >>>>> >>> >>> propose here. >>>>> >>> >>> >>>>> >>> >>> So I wonder if your interface could be named >>>>> InternalTopicsPolicy? >>>>> >>> >>> That's a bit more generic than InternalTopicNamingPolicy. That >>>>> would >>>>> >>> >>> also match the configuration setting, >>>>> internal.topics.policy.class, >>>>> >>> >>> you're proposing. >>>>> >>> >>> >>>>> >>> >>> Thanks >>>>> >>> >>> >>>>> >>> >>> On Thu, Dec 3, 2020 at 10:15 PM Omnia Ibrahim < >>>>> o.g.h.ibra...@gmail.com> wrote: >>>>> >>> >>> > >>>>> >>> >>> > Hi Mickael, >>>>> >>> >>> > Thanks for your feedback! >>>>> >>> >>> > Regards your question about having more configurations, I >>>>> considered adding >>>>> >>> >>> > configuration per each topic however this meant adding more >>>>> configurations >>>>> >>> >>> > for MM2 which already have so many, also the more >>>>> complicated and advanced >>>>> >>> >>> > replication pattern you have between clusters the more >>>>> configuration lines >>>>> >>> >>> > will be added to your MM2 config which isn't going to be >>>>> pretty if you >>>>> >>> >>> > don't have the same topics names across your clusters. >>>>> >>> >>> > >>>>> >>> >>> > Also, it added more complexity to the implementation as MM2 >>>>> need to >>>>> >>> >>> > 1- identify if a topic is checkpoints so we could list the >>>>> checkpoints >>>>> >>> >>> > topics in MirrorMaker 2 utils as one cluster could have X >>>>> numbers >>>>> >>> >>> > checkpoints topics if it's connected to X clusters, this is >>>>> done right now >>>>> >>> >>> > by listing any topic with suffix `.checkpoints.internal`. >>>>> This could be >>>>> >>> >>> > done by add `checkpoints.topic.suffix` config but this would >>>>> make an >>>>> >>> >>> > assumption that checkpoints will always have a suffix also >>>>> having a suffix >>>>> >>> >>> > means that we may need a separator as well to concatenate >>>>> this suffix with >>>>> >>> >>> > a prefix to identify source cluster name. >>>>> >>> >>> > 2- identify if a topic is internal, so it shouldn't be >>>>> replicated or track >>>>> >>> >>> > checkpoints for it, right now this is relaying on disallow >>>>> topics with >>>>> >>> >>> > `.internal` suffix to be not replicated and not tracked in >>>>> checkpoints but >>>>> >>> >>> > with making topics configurable we need a way to define what >>>>> is an internal >>>>> >>> >>> > topic. This could be done by making using a list of all >>>>> internal topics >>>>> >>> >>> > have been entered to the configuration. >>>>> >>> >>> > >>>>> >>> >>> > So having an interface seemed easier and also give more >>>>> flexibility for >>>>> >>> >>> > users to define their own topics name, define what is >>>>> internal topic means, >>>>> >>> >>> > how to find checkpoints topics and it will be one line >>>>> config for each >>>>> >>> >>> > herder, also it more consistence with MM2 code as MM2 config >>>>> have >>>>> >>> >>> > TopicFilter, ReplicationPolicy, GroupFilter, etc as >>>>> interface and they can >>>>> >>> >>> > be overridden by providing a custom implementation for them >>>>> or have some >>>>> >>> >>> > config that change their default implementations. >>>>> >>> >>> > >>>>> >>> >>> > Hope this answer your question. I also updated the KIP to >>>>> add this to the >>>>> >>> >>> > rejected solutions. >>>>> >>> >>> > >>>>> >>> >>> > >>>>> >>> >>> > On Thu, Dec 3, 2020 at 3:19 PM Mickael Maison < >>>>> mickael.mai...@gmail.com> >>>>> >>> >>> > wrote: >>>>> >>> >>> > >>>>> >>> >>> > > Hi Omnia, >>>>> >>> >>> > > >>>>> >>> >>> > > Thanks for the KIP. Indeed being able to configure MM2's >>>>> internal >>>>> >>> >>> > > topic names would be a nice improvement. >>>>> >>> >>> > > >>>>> >>> >>> > > Looking at the KIP, I was surprised you propose an >>>>> interface to allow >>>>> >>> >>> > > users to specify names. Have you considered making names >>>>> changeable >>>>> >>> >>> > > via configurations? If so, we should definitely mention it >>>>> in the >>>>> >>> >>> > > rejected alternatives as it's the first method that comes >>>>> to mind. >>>>> >>> >>> > > >>>>> >>> >>> > > I understand an interface gives a lot of flexibility but >>>>> I'd expect >>>>> >>> >>> > > topic names to be relatively simple and known in advance >>>>> in most >>>>> >>> >>> > > cases. >>>>> >>> >>> > > >>>>> >>> >>> > > I've not checked all use cases but something like below >>>>> felt appropriate: >>>>> >>> >>> > > clusters = primary,backup >>>>> >>> >>> > > >>>>> primary->backup.offsets-sync.topic=backup.mytopic-offsets-sync >>>>> >>> >>> > > >>>>> >>> >>> > > On Tue, Dec 1, 2020 at 3:36 PM Omnia Ibrahim < >>>>> o.g.h.ibra...@gmail.com> >>>>> >>> >>> > > wrote: >>>>> >>> >>> > > > >>>>> >>> >>> > > > Hey everyone, >>>>> >>> >>> > > > Please take a look at KIP-690: >>>>> >>> >>> > > > >>>>> >>> >>> > > > >>>>> >>> >>> > > >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-690%3A+Add+additional+configuration+to+control+MirrorMaker+2+internal+topics+naming+convention >>>>> >>> >>> > > > >>>>> >>> >>> > > > Thanks for your feedback and support. >>>>> >>> >>> > > > >>>>> >>> >>> > > > Omnia >>>>> >>> >>> > > > >>>>> >>> >>> > > >>>>> >>>>