Sure, this would make it easier, we can make these functions returns the
original behaviour (<clusterAlias>.checkpoints.internal,
"mm2-offset-syncs.<clusterAlias>.internal", heartbeat) without any
customisation using `replication.policy.separator` and use the separator in
the DefaultReplicationPolicy

On Wed, Apr 28, 2021 at 1:31 AM Ryanne Dolan <ryannedo...@gmail.com> wrote:

> Thanks Omnia, makes sense to me.
>
> > Customers who have their customised ReplicationPolicy will need to add
> the definition of their internal topics naming convention
>
> I wonder should we include default impls in the interface to avoid that
> requirement?
>
> Ryanne
>
> On Sun, Apr 25, 2021, 2:20 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
> wrote:
>
>> Hi Mickael and Ryanne,
>> I updated the KIP to add these methods to the ReplicationPolicy instead
>> of an extra interface to simplify the changes. Please have a look and let
>> me know your thoughts.
>>
>> Thanks
>>
>> On Tue, Mar 30, 2021 at 7:19 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
>> wrote:
>>
>>> *(sorry forgot to Replay to All) *
>>> Hi Ryanne,
>>> It's a valid concern, I was trying to separate the concerns of internal
>>> and replicated policy away from each other and to make the code readable as
>>> extending ReplicationPolicy to manage both internal and replicated topic is
>>> a bit odd. Am not against simplifying things out to make ReplicationPolicy
>>> handling both at the end of the day if an MM2 user has a special naming
>>> convention for topics it will be affecting both replicated and MM2 internal
>>> topics.
>>>
>>> For simplifying things we can extend `ReplicationPolicy` to the
>>> following instead of adding an extra class
>>>
>>>> *public interface ReplicationPolicy {*
>>>>     String topicSource(String topic);
>>>>     String upstreamTopic(String topic);
>>>>
>>>>
>>>> */** Returns heartbeats topic name.*/    String heartbeatsTopic();*
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *    /** Returns the offset-syncs topic for given cluster alias. */
>>>> String offsetSyncTopic(String targetAlias);    /** Returns the name
>>>> checkpoint topic for given cluster alias. */    String
>>>> checkpointTopic(String sourceAlias); *
>>>>
>>>>     default String originalTopic(String topic) {
>>>>         String upstream = upstreamTopic(topic);
>>>>         if (upstream == null) {
>>>>             return topic;
>>>>         } else {
>>>>             return originalTopic(upstream);
>>>>         }
>>>>     }
>>>>
>>>>
>>>> *    /** Internal topics are never replicated. */
>>>> isInternalTopic(String topic) *//the implementaion will be moved to
>>>> `DefaultReplicationPolicy` to handle both kafka topics and MM2 internal
>>>> topics.
>>>> }
>>>>
>>>
>>> On Fri, Mar 26, 2021 at 3:05 PM Ryanne Dolan <ryannedo...@gmail.com>
>>> wrote:
>>>
>>>> Omnia, have we considered just adding methods to ReplicationPolicy? I'm
>>>> reluctant to add a new class because, as Mickael points out, we'd need to
>>>> carry it around in client code.
>>>>
>>>> Ryanne
>>>>
>>>> On Fri, Feb 19, 2021 at 8:31 AM Mickael Maison <
>>>> mickael.mai...@gmail.com> wrote:
>>>>
>>>>> Hi Omnia,
>>>>>
>>>>> Thanks for the clarifications.
>>>>>
>>>>> - I'm still a bit uneasy with the overlap between these 2 methods as
>>>>> currently `ReplicationPolicy.isInternalTopic` already handles MM2
>>>>> internal topics. Should we make it only handle Kafka internal topics
>>>>> and `isMM2InternalTopic()` only handle MM2 topics?
>>>>>
>>>>> - I'm not sure I understand what this method is used for. There are no
>>>>> such methods for the other 2 topics (offset-sync and heartbeat). Also
>>>>> what happens if there are other MM2 instances using different naming
>>>>> schemes in the same cluster. Do all instances have to know about the
>>>>> other naming schemes? What are the expected issues if they don't?
>>>>>
>>>>> - RemoteClusterUtils is a client-side utility so it does not have
>>>>> access to the MM2 configuration. Since this new API can affect the
>>>>> name of the checkpoint topic, it will need to be used client-side too
>>>>> so users can find the checkpoint topic name. I had to realized this
>>>>> was the case.
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Mon, Feb 15, 2021 at 9:33 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
>>>>> wrote:
>>>>> >
>>>>> > Hi Mickael, did you have some time to check my answer?
>>>>> >
>>>>> > On Thu, Jan 21, 2021 at 10:10 PM Omnia Ibrahim <
>>>>> o.g.h.ibra...@gmail.com> wrote:
>>>>> >>
>>>>> >> Hi Mickael,
>>>>> >> Thanks for taking another look into the KIP, regards your questions
>>>>> >>
>>>>> >> - I believe we need both "isMM2InternalTopic" and
>>>>> `ReplicationPolicy.isInternalTopic`  as 
>>>>> `ReplicationPolicy.isInternalTopic`
>>>>> does check if a topic is Kafka internal topic, while `isMM2InternalTopic`
>>>>> is just focusing if a topic is MM2 internal topic or not(which is
>>>>> heartbeat/checkpoint/offset-sync). The fact that the default for MM2
>>>>> internal topics matches "ReplicationPolicy.isInternalTopic" will not be an
>>>>> accurate assumption anymore once we implement this KIP.
>>>>> >>
>>>>> >> - "isCheckpointTopic" will detect all checkpoint topics for all MM2
>>>>> instances this is needed for "MirrorClient.checkpointTopics" which
>>>>> originally check if the topic name ends with CHECKPOINTS_TOPIC_SUFFIX. So
>>>>> this method just to keep the same functionality that originally exists in
>>>>> MM2
>>>>> >>
>>>>> >> - "checkpointTopic" is used in two places 1. At topic creation in
>>>>> "MirrorCheckpointConnector.createInternalTopics" which use
>>>>> "sourceClusterAlias() + CHECKPOINTS_TOPIC_SUFFIX" and 2. At
>>>>> "MirrorClient.remoteConsumerOffsets" which is called by
>>>>> "RemoteClusterUtils.translateOffsets"  the cluster alias here referred to
>>>>> as "remoteCluster" where the topic name is "remoteClusterAlias +
>>>>> CHECKPOINTS_TOPIC_SUFFIX"  (which is an argument in RemoteClusterUtils, 
>>>>> not
>>>>> a config) This why I called the variable cluster instead of source and
>>>>> instead of using the config to figure out the cluster aliases from config
>>>>> as we use checkpoints to keep `RemoteClusterUtils` compatible for existing
>>>>> users. I see a benefit of just read the config a find out the cluster
>>>>> aliases but on the other side, I'm not sure why "RemoteClusterUtils"
>>>>> doesn't get the name of the cluster from the properties instead of an
>>>>> argument, so I decided to keep it just for compatibility.
>>>>> >>
>>>>> >> Hope these answer some of your concerns.
>>>>> >> Best
>>>>> >> Omnia
>>>>> >>
>>>>> >> On Thu, Jan 21, 2021 at 3:37 PM Mickael Maison <
>>>>> mickael.mai...@gmail.com> wrote:
>>>>> >>>
>>>>> >>> Hi Omnia,
>>>>> >>>
>>>>> >>> Thanks for the updates. Sorry for the delay but I have a few more
>>>>> >>> small questions about the API:
>>>>> >>> - Do we really need "isMM2InternalTopic()"? There's already
>>>>> >>> "ReplicationPolicy.isInternalTopic()". If so, we need to explain
>>>>> the
>>>>> >>> difference between these 2 methods.
>>>>> >>>
>>>>> >>> - Is "isCheckpointTopic()" expected to detect all checkpoint topics
>>>>> >>> (for all MM2 instances) or only the ones for this connector
>>>>> instance.
>>>>> >>> If it's the later, I wonder if we could do without the method. As
>>>>> this
>>>>> >>> interface is only called by MM2, we could first call
>>>>> >>> "checkpointTopic()" and check if that's equal to the topic we're
>>>>> >>> checking. If it's the former, we don't really know topic names
>>>>> other
>>>>> >>> MM2 instances may be using!
>>>>> >>>
>>>>> >>> - The 3 methods returning topic names have different APIs:
>>>>> >>> "heartbeatsTopic()" takes no arguments, "offsetSyncTopic()" takes
>>>>> the
>>>>> >>> target cluster alias and "checkpointTopic()" takes "clusterAlias"
>>>>> >>> (which one is it? source or target?). As the interface extends
>>>>> >>> Configurable, maybe we could get rid of all the arguments and use
>>>>> the
>>>>> >>> config to find the cluster aliases. WDYT?
>>>>> >>>
>>>>> >>> These are minor concerns, just making sure I fully understand how
>>>>> the
>>>>> >>> API is expected to be used. Once these are cleared, I'll be happy
>>>>> to
>>>>> >>> vote for this KIP.
>>>>> >>>
>>>>> >>> Thanks
>>>>> >>>
>>>>> >>> On Fri, Jan 8, 2021 at 12:06 PM Omnia Ibrahim <
>>>>> o.g.h.ibra...@gmail.com> wrote:
>>>>> >>> >
>>>>> >>> > Hi Mickael,
>>>>> >>> > Did you get time to review the changes to the KIP? If you okay
>>>>> with it could you vote for the KIP here ttps://
>>>>> www.mail-archive.com/dev@kafka.apache.org/msg113575.html?
>>>>> >>> > Thanks
>>>>> >>> >
>>>>> >>> > On Thu, Dec 10, 2020 at 2:19 PM Omnia Ibrahim <
>>>>> o.g.h.ibra...@gmail.com> wrote:
>>>>> >>> >>
>>>>> >>> >> Hi Mickael,
>>>>> >>> >> 1) That's right the interface and default implementation will
>>>>> in mirror-connect
>>>>> >>> >> 2) Renaming the interface should be fine too especially if you
>>>>> planning to move other functionality related to the creation there, I can
>>>>> edit this
>>>>> >>> >>
>>>>> >>> >> if you are okay with that please vote for the KIP here
>>>>> https://www.mail-archive.com/dev@kafka.apache.org/msg113575.html
>>>>> >>> >>
>>>>> >>> >>
>>>>> >>> >> Thanks
>>>>> >>> >> Omnia
>>>>> >>> >> On Thu, Dec 10, 2020 at 12:58 PM Mickael Maison <
>>>>> mickael.mai...@gmail.com> wrote:
>>>>> >>> >>>
>>>>> >>> >>> Hi Omnia,
>>>>> >>> >>>
>>>>> >>> >>> Thank you for the reply, it makes sense.
>>>>> >>> >>>
>>>>> >>> >>> A couple more comments:
>>>>> >>> >>>
>>>>> >>> >>> 1) I'm assuming the new interface and default implementation
>>>>> will be
>>>>> >>> >>> in the mirror-client project? as the names of some of these
>>>>> topics are
>>>>> >>> >>> needed by RemoteClusterUtils on the client-side.
>>>>> >>> >>>
>>>>> >>> >>> 2) I'm about to open a KIP to specify where the offset-syncs
>>>>> topic is
>>>>> >>> >>> created by MM2. In restricted environments, we'd prefer MM2 to
>>>>> only
>>>>> >>> >>> have read access to the source cluster and have the
>>>>> offset-syncs on
>>>>> >>> >>> the target cluster. I think allowing to specify the cluster
>>>>> where to
>>>>> >>> >>> create that topic would be a natural extension of the
>>>>> interface you
>>>>> >>> >>> propose here.
>>>>> >>> >>>
>>>>> >>> >>> So I wonder if your interface could be named
>>>>> InternalTopicsPolicy?
>>>>> >>> >>> That's a bit more generic than InternalTopicNamingPolicy. That
>>>>> would
>>>>> >>> >>> also match the configuration setting,
>>>>> internal.topics.policy.class,
>>>>> >>> >>> you're proposing.
>>>>> >>> >>>
>>>>> >>> >>> Thanks
>>>>> >>> >>>
>>>>> >>> >>> On Thu, Dec 3, 2020 at 10:15 PM Omnia Ibrahim <
>>>>> o.g.h.ibra...@gmail.com> wrote:
>>>>> >>> >>> >
>>>>> >>> >>> > Hi Mickael,
>>>>> >>> >>> > Thanks for your feedback!
>>>>> >>> >>> > Regards your question about having more configurations, I
>>>>> considered adding
>>>>> >>> >>> > configuration per each topic however this meant adding more
>>>>> configurations
>>>>> >>> >>> > for MM2 which already have so many, also the more
>>>>> complicated and advanced
>>>>> >>> >>> > replication pattern you have between clusters the more
>>>>> configuration lines
>>>>> >>> >>> > will be added to your MM2 config which isn't going to be
>>>>> pretty if you
>>>>> >>> >>> > don't have the same topics names across your clusters.
>>>>> >>> >>> >
>>>>> >>> >>> > Also, it added more complexity to the implementation as MM2
>>>>> need to
>>>>> >>> >>> > 1- identify if a topic is checkpoints so we could list the
>>>>> checkpoints
>>>>> >>> >>> > topics in MirrorMaker 2 utils as one cluster could have X
>>>>> numbers
>>>>> >>> >>> > checkpoints topics if it's connected to X clusters, this is
>>>>> done right now
>>>>> >>> >>> > by listing any topic with suffix `.checkpoints.internal`.
>>>>> This could be
>>>>> >>> >>> > done by add `checkpoints.topic.suffix` config but this would
>>>>> make an
>>>>> >>> >>> > assumption that checkpoints will always have a suffix also
>>>>> having a suffix
>>>>> >>> >>> > means that we may need a separator as well to concatenate
>>>>> this suffix with
>>>>> >>> >>> > a prefix to identify source cluster name.
>>>>> >>> >>> > 2- identify if a topic is internal, so it shouldn't be
>>>>> replicated or track
>>>>> >>> >>> > checkpoints for it, right now this is relaying on disallow
>>>>> topics with
>>>>> >>> >>> > `.internal` suffix to be not replicated and not tracked in
>>>>> checkpoints but
>>>>> >>> >>> > with making topics configurable we need a way to define what
>>>>> is an internal
>>>>> >>> >>> > topic. This could be done by making using a list of all
>>>>> internal topics
>>>>> >>> >>> > have been entered to the configuration.
>>>>> >>> >>> >
>>>>> >>> >>> > So having an interface seemed easier and also give more
>>>>> flexibility for
>>>>> >>> >>> > users to define their own topics name, define what is
>>>>> internal topic means,
>>>>> >>> >>> > how to find checkpoints topics and it will be one line
>>>>> config for each
>>>>> >>> >>> > herder, also it more consistence with MM2 code as MM2 config
>>>>> have
>>>>> >>> >>> > TopicFilter, ReplicationPolicy, GroupFilter, etc as
>>>>> interface and they can
>>>>> >>> >>> > be overridden by providing a custom implementation for them
>>>>> or have some
>>>>> >>> >>> > config that change their default implementations.
>>>>> >>> >>> >
>>>>> >>> >>> > Hope this answer your question. I also updated the KIP to
>>>>> add this to the
>>>>> >>> >>> > rejected solutions.
>>>>> >>> >>> >
>>>>> >>> >>> >
>>>>> >>> >>> > On Thu, Dec 3, 2020 at 3:19 PM Mickael Maison <
>>>>> mickael.mai...@gmail.com>
>>>>> >>> >>> > wrote:
>>>>> >>> >>> >
>>>>> >>> >>> > > Hi Omnia,
>>>>> >>> >>> > >
>>>>> >>> >>> > > Thanks for the KIP. Indeed being able to configure MM2's
>>>>> internal
>>>>> >>> >>> > > topic names would be a nice improvement.
>>>>> >>> >>> > >
>>>>> >>> >>> > > Looking at the KIP, I was surprised you propose an
>>>>> interface to allow
>>>>> >>> >>> > > users to specify names. Have you considered making names
>>>>> changeable
>>>>> >>> >>> > > via configurations? If so, we should definitely mention it
>>>>> in the
>>>>> >>> >>> > > rejected alternatives as it's the first method that comes
>>>>> to mind.
>>>>> >>> >>> > >
>>>>> >>> >>> > > I understand an interface gives a lot of flexibility but
>>>>> I'd expect
>>>>> >>> >>> > > topic names to be relatively simple and known in advance
>>>>> in most
>>>>> >>> >>> > > cases.
>>>>> >>> >>> > >
>>>>> >>> >>> > > I've not checked all use cases but something like below
>>>>> felt appropriate:
>>>>> >>> >>> > > clusters = primary,backup
>>>>> >>> >>> > >
>>>>> primary->backup.offsets-sync.topic=backup.mytopic-offsets-sync
>>>>> >>> >>> > >
>>>>> >>> >>> > > On Tue, Dec 1, 2020 at 3:36 PM Omnia Ibrahim <
>>>>> o.g.h.ibra...@gmail.com>
>>>>> >>> >>> > > wrote:
>>>>> >>> >>> > > >
>>>>> >>> >>> > > > Hey everyone,
>>>>> >>> >>> > > > Please take a look at KIP-690:
>>>>> >>> >>> > > >
>>>>> >>> >>> > > >
>>>>> >>> >>> > >
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-690%3A+Add+additional+configuration+to+control+MirrorMaker+2+internal+topics+naming+convention
>>>>> >>> >>> > > >
>>>>> >>> >>> > > > Thanks for your feedback and support.
>>>>> >>> >>> > > >
>>>>> >>> >>> > > > Omnia
>>>>> >>> >>> > > >
>>>>> >>> >>> > >
>>>>>
>>>>

Reply via email to