Hi Omnia,

I think the current proposal makes sense. Thanks for driving this feature.

On Mon, Jun 21, 2021 at 11:51 PM Ryanne Dolan <ryannedo...@gmail.com> wrote:
>
> Omnia, I agree with you that allowing users to specify the whole topic name 
> via configuration is likely to create problems. MM2 must distinguish between 
> internal topics from different clusters, and pushing that complexity into 
> configuration sounds really complicated.
>
> I like the ReplicationPolicy approach.
>
> Ryanne
>
> On Mon, Jun 21, 2021, 2:04 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote:
>>
>> Any thoughts on this KIP?
>>
>>
>> On Thu, Jun 17, 2021 at 1:38 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
>> wrote:
>>
>> > Another reason why I think adding a configuration for each internal topic 
>> > is not a good solution is how MM2 is naming these topics at the moment.
>> > Right now MM2 sets the name of the offset-syncs topic to 
>> > mm2-offset-syncs.<cluster-alias>.internal and for checkpoints is 
>> > <cluster-alias>.checkpoints.internal so the name has a pattern to link it 
>> > back to the herder of source -> target mirror link so having this in 
>> > configuration will lead to
>> > 1. having a method that determines the final name of internal topics for 
>> > backward compatibility and have this method to be the default of the 
>> > configuration values. The challenge here is that we need to load first the 
>> > clusters alias to calculate the default value for offset-syncs.topic.name 
>> > and checkpoints.topic.name.
>> > 2. Consider use cases where MM2 is used to mirror between multiple 
>> > clusters, for example:
>> >         source1 -> target.enabled = true
>> >         source2 -> target.enabled = true
>> > For this use-case the current behaviour will create the following 
>> > offset-syncs and checkpoints on each cluster:
>> > source1 cluster
>> >         - mm2-offset-syncs.target.internal
>> > source2 cluster
>> >         - mm2-offset-syncs.target.internal
>> > target cluster
>> >         - source1.checkpoints.internal
>> >         - source2.checkpoints.internal
>> > As MM2 design in the original KIP-382 is spliting internal topics bsed on 
>> > mirroring links. Now if we let MM2 users set the full name of these topics 
>> > as configuration, how will we detect if the user has a wrong configuration 
>> > where they used the same name for checkpoints topic for both source1 and 
>> > source2. How this will work if both source1 and source2 clusters have 
>> > consumer groups with same ids as checkpoints topic messages contains 
>> > consumer group id? Should we warn the MM2 user that this topic has been 
>> > used before for another source cluster? If not how will the MM2 user 
>> > notice that?
>> >
>> >
>> > On Mon, Jun 14, 2021 at 5:54 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
>> > wrote:
>> >
>> >> Hi folks, let me try to clarify some of your concerns and questions.
>> >>
>> >> Mickael: Have you considered making names changeable via configurations?
>> >>>
>> >>
>> >> Gwen: may be missing something, but we are looking at 3 new configs (one
>> >>> for each topic). And this rejected alternative is basically identical to
>> >>> what Connect already does (you can choose names for internal topics using
>> >>> configs).
>> >>>
>> >>> These are valid points. The reasons why we should prefer an interface
>> >> (the current proposal is using the ReplicationPolicy interface which
>> >> already exists in MM2) instead are
>> >>
>> >> 1. the number of configurations that MM2 has. Right now MM2 has its own
>> >> set of configuration in addition to configuration for admin, consumer and
>> >> producer clients and Connect API. And these configurations in some
>> >> use-cases could be different based on the herder.
>> >>
>> >> Consider a use case where MM2 is used to mirror between a set of clusters
>> >> running by different teams and have different naming policies. So if we 
>> >> are
>> >> using 3 configurations for internal topics for a use case like below the
>> >> configuration will be like this. If the number of policies grows, the
>> >> amount of configuration can get unwieldy.
>> >>
>> >> clusters = newCenterCluster, teamACluster, teamBCluster, ...
>> >>
>> >> //newCenterCluster policy is <teamName>.<topicName>
>> >> //teamACluster naming policy is <app>_<topicName> when move to 
>> >> newCenterCluster it will be teamA.<app>_<topicName>
>> >> //teamBCluster naming policy is <domain>.<topicName> when move to 
>> >> newCenterCluster it will be teamB.<domain>_<topicName>
>> >>
>> >> //The goal is to move all topics from team-specific cluster to one new 
>> >> cluster
>> >> // where the org can unify resource management and naming conventions
>> >>
>> >> replication.policy.class=MyCustomReplicationPolicy
>> >>
>> >> teamACluster.heartbeat.topic=mm2_heartbeat_topic // created on source 
>> >> cluster
>> >> teamACluster->newCenterCluster.offsets-sync.topic=mm2_my_offset_sync_topic
>> >>  //created on source cluster at the moment
>> >> teamACluster->newCenterCluster.checkpoints.topic=teamA.mm2_checkpoint_topic
>> >>  //created on target cluster which newCenterCluster
>> >>
>> >> teamBCluster.heartbeat.topic=mm2.heartbeat_topic // created on source 
>> >> cluster
>> >> teamBCluster->newCenterCluster.offsets-sync.topic=mm2.my_offset_sync_topic
>> >>  //created on source cluster at the moment
>> >> teamBCluster->newCenterCluster.checkpoints.topic=teamB.mm2_checkpoint_topic
>> >>  //created on target cluster which newCenterCluster
>> >>
>> >> teamACluster.config.storage.topic=...
>> >> teamACluster.offset.storage.topic=...
>> >> teamACluster.status.storage.topic=...
>> >>
>> >> teamBCluster.config.storage.topic=...
>> >> teamBCluster.offset.storage.topic=...
>> >> teamBCluster.status.storage.topic=...
>> >>
>> >>
>> >> teamACluster→newCenterCluster.enabled=true
>> >> teamACluster→newCenterCluster.enabled=true
>> >>
>> >> 2. The other reason is what Mickael mentioned regards a future KIP to
>> >> move offset-syncs on the target cluster.
>> >>
>> >>> Mickael: I'm about to open a KIP to specify where the offset-syncs topic
>> >>> is created by MM2. In restricted environments, we'd prefer MM2 to only 
>> >>> have
>> >>> read access to the source cluster and have the offset-syncs on the target
>> >>> cluster. I think allowing to specify the cluster where to create that 
>> >>> topic
>> >>> would be a natural extension of the interface you propose here.
>> >>>
>> >>
>> >> In this case, where you want to achieve “read-only” on the source cluster
>> >> then using ReplicationPolicy to name the offset-syncs topic makes more
>> >> sense as ReplicationPolicy holds the implementation of how topics will be
>> >> named on the target cluster where MM2 will have “write” access.
>> >> For example, the user can provide their own naming convention for the
>> >> target cluster as part of replication.policy.class =
>> >> MyCustomReplicationPolicy where it formate the name of the replicated
>> >> topic to be <something>.<topic>.<something>.Now if we have also an extra
>> >> config for internal topics that will also be created at target with the
>> >> similar naming convention <something>.<internal_topic>.<something> this
>> >> means the user will need to add
>> >> offset-sync.topic=<something>.<offset_topic_name>.<something> this feels
>> >> like a duplication for me as we could achieve it by re-using the logic 
>> >> from
>> >>  ReplicationPolicy
>> >>
>> >> So using the replication interface we can define
>> >> MyCustomReplicationPolicy like this following
>> >>
>> >> public class *MyCustomReplicationPolicy* implements ReplicationPolicy {
>> >>     @Override
>> >>     //How to rename remote topics
>> >>     public String *formatRemoteTopic*(String sourceClusterAlias, String 
>> >> topic) {
>> >>         return nameTopicOnTarger(sourceClusterAlias, topic, "mirrored");
>> >>     }
>> >>
>> >>     @Override
>> >>     String *offsetSyncTopic*(String clusterAlias) {
>> >>         // offset-sync topic will be created on target cluster so it need 
>> >> to follow
>> >>         // naming convention of target cluster
>> >>         return nameTopicOnTarget(clusterAlias, "mm2-offset-sync", 
>> >> "internal");
>> >>     }
>> >>
>> >>     @Override
>> >>     String *checkpointsTopic*(String clusterAlias) {
>> >>         // checkpoints topic is created on target cluster so it need to 
>> >> follow
>> >>         // naming convention of target cluster
>> >>         return nameTopicOnTarget(clusterAlias, "mm2-checkpoints", 
>> >> "internal");
>> >>     }
>> >>
>> >>     private String *nameTopicOnTarget*(String prefix, String topic, 
>> >> String suffix) {
>> >>             return prefix + separator + topic + seprator + suffix;
>> >>     }
>> >> }
>> >>
>> >> and MM2 configs will be
>> >>
>> >> *newCenterCluster.replication.policy.class=MyCustomReplicationPolicy*
>> >>
>> >> Mickael: I understand an interface gives a lot of flexibility but I'd
>> >>> expect topic names to be relatively simple and known in advance in most
>> >>> cases.
>> >>>
>> >>
>> >> Gwen: How is it too complex?
>> >>>
>> >>
>> >> The reason I opted for this solution wasn't about complexity but rather
>> >> the number of configurations that MM2 already has. In addition to the 
>> >> point
>> >> above regarding Mickael's future KIP to move offset-sync topic target
>> >> cluster.
>> >>
>> >>
>> >> Gwen: There is a discussion about how this gets more complicated in some
>> >>> scenarios, but it was a bit abstract - is there a concrete case that 
>> >>> shows
>> >>> why the configuration is more complicated than implementing a plugin?
>> >>>
>> >>
>> >> The use case I have in mind is something like my example above. Also
>> >> considering that Mickael is planning to open a KIP to restrict the access
>> >> to the source cluster and have the offset mapping topics on the target
>> >> cluster this means using the ReplicationPolicy that handle naming policies
>> >> at the target cluster makes it easier because if the target cluster has a
>> >> restricted naming convention it will be applied on all topics created 
>> >> there.
>> >>
>> >> Another consideration is a future KIP I’ll be raising soon to abstract
>> >> how MM2 creates topics on target cluster, say for example to integrate 
>> >> with
>> >> a centralized resource management system. A custom ReplicationPolicy will
>> >> give users more flexibility in this kind of integration and having to
>> >> manage what can potentially be a lot less configuration.
>> >>
>> >>
>> >> I’m not strongly attached to using an interface as the solution for
>> >> naming internal topics however, I feel it will give us more flexibility
>> >> with future KIPs.
>> >>
>> >> I would appreciate getting feedback from you both. And if the majority
>> >> feels like the configurations are easier I can change the proposal to 
>> >> this.
>> >>
>> >> --
>> >> Omnia
>> >>
>> >>
>> >> On Sat, May 15, 2021 at 5:59 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
>> >> wrote:
>> >>
>> >>> Hey Gwen,
>> >>> Thanks for having a look into the KIP, regard your question
>> >>>
>> >>>> is there a concrete case that shows why configuration is more
>> >>>> complicated than implementing a plugin?
>> >>>>
>> >>>
>> >>> The case I had in mind is where an organisation with medium-to-large
>> >>> size have multiple teams running their own Kafka clusters without a
>> >>> centralized team that runs Kafka and enforce the same rule for naming
>> >>> resources everywhere.
>> >>> In this environment, if the organization for example is trying to use
>> >>> MM2 to migrate all their data into one place or the data is needed on 
>> >>> both
>> >>> clusters for any business use cases (like needs aggregate data from
>> >>> different clusters), adding extra config for each internal topic will
>> >>> increase the amount of configuration need to run MM2 for this case.
>> >>>
>> >>> The other concerns I have in general are
>> >>>
>> >>> 1. The popular usage of MM2 is replicating topics between clusters
>> >>> running by the same team, in this case, it's most likely that if this 
>> >>> team
>> >>> has a naming topic's rule, this same rule will be applied to both
>> >>> replicated and some of the internal topics of MM2. If we added config
>> >>> like connect to each internal topic then these customers will end up 
>> >>> adding
>> >>> 4 configs just to handle the same naming rule, 1 to include customised
>> >>> replication policy for naming replicated topics + extra 3 configs for the
>> >>> internals to match the same rule.
>> >>>
>> >>> 2. The replication policy plugin already implemented in the MM2, and
>> >>> many customers have already their own customised implementation (there're
>> >>> few implementations flying around already for this policy in the 
>> >>> community)
>> >>> so we aren't adding extra configuration instead we are expanding the
>> >>> responsibility of the policy.
>> >>>
>> >>> Am happy to make it align with connect configs if others disagree with
>> >>> my concerns, at the end my number one goal is to make it flexible to name
>> >>> these topics. I opt-in for an interface based solution so I can minimize
>> >>> the number of config customers need to add.
>> >>>
>> >>> Omnia
>> >>>
>> >>>
>> >>> On Thu, May 13, 2021 at 9:30 PM Gwen Shapira <g...@confluent.io.invalid>
>> >>> wrote:
>> >>>
>> >>>> Hey, sorry for arriving late, but I have a question about the rejected
>> >>>> alternative involving configuration.
>> >>>>
>> >>>> I may be missing something, but we are looking at 3 new configs (one for
>> >>>> each topic). And this rejected alternative is basically identical to
>> >>>> what
>> >>>> Connect already does (you can choose names for internal topics using
>> >>>> configs). How is it too complex? As a user, configuring 3 fields seems
>> >>>> much
>> >>>> simpler than implementing a class. As an admin, trusting to run
>> >>>> someone's
>> >>>> code is scary, but a config with topic name is very safe and easy to
>> >>>> test
>> >>>> and trust.
>> >>>>
>> >>>> There is a discussion about how this gets more complicated in some
>> >>>> scenarios, but it was a bit abstract - is there a concrete case that
>> >>>> shows
>> >>>> why configuration is more complicated than implementing a plugin?
>> >>>>
>> >>>> Gwen
>> >>>>
>> >>>> On Fri, Apr 30, 2021 at 2:30 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
>> >>>> wrote:
>> >>>>
>> >>>> > Hi Ryanne, Can you vote for it here
>> >>>> > https://www.mail-archive.com/dev@kafka.apache.org/msg113575.html as
>> >>>> well,
>> >>>> > please?
>> >>>> >
>> >>>> > On Fri, Apr 30, 2021 at 12:44 AM Ryanne Dolan <ryannedo...@gmail.com>
>> >>>> > wrote:
>> >>>> >
>> >>>> > > Thanks Omnia. lgtm!
>> >>>> > >
>> >>>> > > Ryanne
>> >>>> > >
>> >>>> > > On Thu, Apr 29, 2021 at 10:50 AM Omnia Ibrahim <
>> >>>> o.g.h.ibra...@gmail.com>
>> >>>> > > wrote:
>> >>>> > >
>> >>>> > >> I updated the KIP
>> >>>> > >>
>> >>>> > >> On Thu, Apr 29, 2021 at 4:43 PM Omnia Ibrahim <
>> >>>> o.g.h.ibra...@gmail.com>
>> >>>> > >> wrote:
>> >>>> > >>
>> >>>> > >>> Sure, this would make it easier, we can make these functions
>> >>>> returns
>> >>>> > the
>> >>>> > >>> original behaviour (<clusterAlias>.checkpoints.internal,
>> >>>> > >>> "mm2-offset-syncs.<clusterAlias>.internal", heartbeat) without
>> >>>> any
>> >>>> > >>> customisation using `replication.policy.separator` and use the
>> >>>> > separator in
>> >>>> > >>> the DefaultReplicationPolicy
>> >>>> > >>>
>> >>>> > >>> On Wed, Apr 28, 2021 at 1:31 AM Ryanne Dolan <
>> >>>> ryannedo...@gmail.com>
>> >>>> > >>> wrote:
>> >>>> > >>>
>> >>>> > >>>> Thanks Omnia, makes sense to me.
>> >>>> > >>>>
>> >>>> > >>>> > Customers who have their customised ReplicationPolicy will
>> >>>> need to
>> >>>> > >>>> add the definition of their internal topics naming convention
>> >>>> > >>>>
>> >>>> > >>>> I wonder should we include default impls in the interface to
>> >>>> avoid
>> >>>> > that
>> >>>> > >>>> requirement?
>> >>>> > >>>>
>> >>>> > >>>> Ryanne
>> >>>> > >>>>
>> >>>> > >>>> On Sun, Apr 25, 2021, 2:20 PM Omnia Ibrahim <
>> >>>> o.g.h.ibra...@gmail.com>
>> >>>> > >>>> wrote:
>> >>>> > >>>>
>> >>>> > >>>>> Hi Mickael and Ryanne,
>> >>>> > >>>>> I updated the KIP to add these methods to the ReplicationPolicy
>> >>>> > >>>>> instead of an extra interface to simplify the changes. Please
>> >>>> have a
>> >>>> > look
>> >>>> > >>>>> and let me know your thoughts.
>> >>>> > >>>>>
>> >>>> > >>>>> Thanks
>> >>>> > >>>>>
>> >>>> > >>>>> On Tue, Mar 30, 2021 at 7:19 PM Omnia Ibrahim <
>> >>>> > o.g.h.ibra...@gmail.com>
>> >>>> > >>>>> wrote:
>> >>>> > >>>>>
>> >>>> > >>>>>> *(sorry forgot to Replay to All) *
>> >>>> > >>>>>> Hi Ryanne,
>> >>>> > >>>>>> It's a valid concern, I was trying to separate the concerns of
>> >>>> > >>>>>> internal and replicated policy away from each other and to
>> >>>> make the
>> >>>> > code
>> >>>> > >>>>>> readable as extending ReplicationPolicy to manage both
>> >>>> internal and
>> >>>> > >>>>>> replicated topic is a bit odd. Am not against simplifying
>> >>>> things
>> >>>> > out to
>> >>>> > >>>>>> make ReplicationPolicy handling both at the end of the day if
>> >>>> an
>> >>>> > MM2 user
>> >>>> > >>>>>> has a special naming convention for topics it will be
>> >>>> affecting both
>> >>>> > >>>>>> replicated and MM2 internal topics.
>> >>>> > >>>>>>
>> >>>> > >>>>>> For simplifying things we can extend `ReplicationPolicy` to the
>> >>>> > >>>>>> following instead of adding an extra class
>> >>>> > >>>>>>
>> >>>> > >>>>>>> *public interface ReplicationPolicy {*
>> >>>> > >>>>>>>     String topicSource(String topic);
>> >>>> > >>>>>>>     String upstreamTopic(String topic);
>> >>>> > >>>>>>>
>> >>>> > >>>>>>>
>> >>>> > >>>>>>> */** Returns heartbeats topic name.*/    String
>> >>>> heartbeatsTopic();*
>> >>>> > >>>>>>>
>> >>>> > >>>>>>>
>> >>>> > >>>>>>>
>> >>>> > >>>>>>>
>> >>>> > >>>>>>>
>> >>>> > >>>>>>> *    /** Returns the offset-syncs topic for given cluster
>> >>>> alias. */
>> >>>> > >>>>>>>   String offsetSyncTopic(String targetAlias);    /** Returns
>> >>>> the
>> >>>> > name
>> >>>> > >>>>>>> checkpoint topic for given cluster alias. */    String
>> >>>> > >>>>>>> checkpointTopic(String sourceAlias); *
>> >>>> > >>>>>>>
>> >>>> > >>>>>>>     default String originalTopic(String topic) {
>> >>>> > >>>>>>>         String upstream = upstreamTopic(topic);
>> >>>> > >>>>>>>         if (upstream == null) {
>> >>>> > >>>>>>>             return topic;
>> >>>> > >>>>>>>         } else {
>> >>>> > >>>>>>>             return originalTopic(upstream);
>> >>>> > >>>>>>>         }
>> >>>> > >>>>>>>     }
>> >>>> > >>>>>>>
>> >>>> > >>>>>>>
>> >>>> > >>>>>>> *    /** Internal topics are never replicated. */
>> >>>> > >>>>>>> isInternalTopic(String topic) *//the implementaion will be
>> >>>> moved to
>> >>>> > >>>>>>> `DefaultReplicationPolicy` to handle both kafka topics and MM2
>> >>>> > internal
>> >>>> > >>>>>>> topics.
>> >>>> > >>>>>>> }
>> >>>> > >>>>>>>
>> >>>> > >>>>>>
>> >>>> > >>>>>> On Fri, Mar 26, 2021 at 3:05 PM Ryanne Dolan <
>> >>>> ryannedo...@gmail.com
>> >>>> > >
>> >>>> > >>>>>> wrote:
>> >>>> > >>>>>>
>> >>>> > >>>>>>> Omnia, have we considered just adding methods to
>> >>>> ReplicationPolicy?
>> >>>> > >>>>>>> I'm reluctant to add a new class because, as Mickael points
>> >>>> out,
>> >>>> > we'd need
>> >>>> > >>>>>>> to carry it around in client code.
>> >>>> > >>>>>>>
>> >>>> > >>>>>>> Ryanne
>> >>>> > >>>>>>>
>> >>>> > >>>>>>> On Fri, Feb 19, 2021 at 8:31 AM Mickael Maison <
>> >>>> > >>>>>>> mickael.mai...@gmail.com> wrote:
>> >>>> > >>>>>>>
>> >>>> > >>>>>>>> Hi Omnia,
>> >>>> > >>>>>>>>
>> >>>> > >>>>>>>> Thanks for the clarifications.
>> >>>> > >>>>>>>>
>> >>>> > >>>>>>>> - I'm still a bit uneasy with the overlap between these 2
>> >>>> methods
>> >>>> > as
>> >>>> > >>>>>>>> currently `ReplicationPolicy.isInternalTopic` already
>> >>>> handles MM2
>> >>>> > >>>>>>>> internal topics. Should we make it only handle Kafka internal
>> >>>> > topics
>> >>>> > >>>>>>>> and `isMM2InternalTopic()` only handle MM2 topics?
>> >>>> > >>>>>>>>
>> >>>> > >>>>>>>> - I'm not sure I understand what this method is used for.
>> >>>> There
>> >>>> > are
>> >>>> > >>>>>>>> no
>> >>>> > >>>>>>>> such methods for the other 2 topics (offset-sync and
>> >>>> heartbeat).
>> >>>> > >>>>>>>> Also
>> >>>> > >>>>>>>> what happens if there are other MM2 instances using different
>> >>>> > naming
>> >>>> > >>>>>>>> schemes in the same cluster. Do all instances have to know
>> >>>> about
>> >>>> > the
>> >>>> > >>>>>>>> other naming schemes? What are the expected issues if they
>> >>>> don't?
>> >>>> > >>>>>>>>
>> >>>> > >>>>>>>> - RemoteClusterUtils is a client-side utility so it does not
>> >>>> have
>> >>>> > >>>>>>>> access to the MM2 configuration. Since this new API can
>> >>>> affect the
>> >>>> > >>>>>>>> name of the checkpoint topic, it will need to be used
>> >>>> client-side
>> >>>> > >>>>>>>> too
>> >>>> > >>>>>>>> so users can find the checkpoint topic name. I had to
>> >>>> realized
>> >>>> > this
>> >>>> > >>>>>>>> was the case.
>> >>>> > >>>>>>>>
>> >>>> > >>>>>>>> Thanks
>> >>>> > >>>>>>>>
>> >>>> > >>>>>>>> On Mon, Feb 15, 2021 at 9:33 PM Omnia Ibrahim <
>> >>>> > >>>>>>>> o.g.h.ibra...@gmail.com> wrote:
>> >>>> > >>>>>>>> >
>> >>>> > >>>>>>>> > Hi Mickael, did you have some time to check my answer?
>> >>>> > >>>>>>>> >
>> >>>> > >>>>>>>> > On Thu, Jan 21, 2021 at 10:10 PM Omnia Ibrahim <
>> >>>> > >>>>>>>> o.g.h.ibra...@gmail.com> wrote:
>> >>>> > >>>>>>>> >>
>> >>>> > >>>>>>>> >> Hi Mickael,
>> >>>> > >>>>>>>> >> Thanks for taking another look into the KIP, regards your
>> >>>> > >>>>>>>> questions
>> >>>> > >>>>>>>> >>
>> >>>> > >>>>>>>> >> - I believe we need both "isMM2InternalTopic" and
>> >>>> > >>>>>>>> `ReplicationPolicy.isInternalTopic`  as
>> >>>> > `ReplicationPolicy.isInternalTopic`
>> >>>> > >>>>>>>> does check if a topic is Kafka internal topic, while
>> >>>> > `isMM2InternalTopic`
>> >>>> > >>>>>>>> is just focusing if a topic is MM2 internal topic or
>> >>>> not(which is
>> >>>> > >>>>>>>> heartbeat/checkpoint/offset-sync). The fact that the
>> >>>> default for
>> >>>> > MM2
>> >>>> > >>>>>>>> internal topics matches "ReplicationPolicy.isInternalTopic"
>> >>>> will
>> >>>> > not be an
>> >>>> > >>>>>>>> accurate assumption anymore once we implement this KIP.
>> >>>> > >>>>>>>> >>
>> >>>> > >>>>>>>> >> - "isCheckpointTopic" will detect all checkpoint topics
>> >>>> for all
>> >>>> > >>>>>>>> MM2 instances this is needed for "MirrorClient.
>> >>>> checkpointTopics"
>> >>>> > which
>> >>>> > >>>>>>>> originally check if the topic name ends with
>> >>>> > CHECKPOINTS_TOPIC_SUFFIX. So
>> >>>> > >>>>>>>> this method just to keep the same functionality that
>> >>>> originally
>> >>>> > exists in
>> >>>> > >>>>>>>> MM2
>> >>>> > >>>>>>>> >>
>> >>>> > >>>>>>>> >> - "checkpointTopic" is used in two places 1. At topic
>> >>>> creation
>> >>>> > >>>>>>>> in "MirrorCheckpointConnector.createInternalTopics" which
>> >>>> use
>> >>>> > >>>>>>>> "sourceClusterAlias() + CHECKPOINTS_TOPIC_SUFFIX" and 2. At
>> >>>> > >>>>>>>> "MirrorClient.remoteConsumerOffsets" which is called by
>> >>>> > >>>>>>>> "RemoteClusterUtils.translateOffsets"  the cluster alias
>> >>>> here
>> >>>> > referred to
>> >>>> > >>>>>>>> as "remoteCluster" where the topic name is
>> >>>> "remoteClusterAlias +
>> >>>> > >>>>>>>> CHECKPOINTS_TOPIC_SUFFIX"  (which is an argument in
>> >>>> > RemoteClusterUtils, not
>> >>>> > >>>>>>>> a config) This why I called the variable cluster instead of
>> >>>> > source and
>> >>>> > >>>>>>>> instead of using the config to figure out the cluster aliases
>> >>>> > from config
>> >>>> > >>>>>>>> as we use checkpoints to keep `RemoteClusterUtils`
>> >>>> compatible for
>> >>>> > existing
>> >>>> > >>>>>>>> users. I see a benefit of just read the config a find out the
>> >>>> > cluster
>> >>>> > >>>>>>>> aliases but on the other side, I'm not sure why
>> >>>> > "RemoteClusterUtils"
>> >>>> > >>>>>>>> doesn't get the name of the cluster from the properties
>> >>>> instead
>> >>>> > of an
>> >>>> > >>>>>>>> argument, so I decided to keep it just for compatibility.
>> >>>> > >>>>>>>> >>
>> >>>> > >>>>>>>> >> Hope these answer some of your concerns.
>> >>>> > >>>>>>>> >> Best
>> >>>> > >>>>>>>> >> Omnia
>> >>>> > >>>>>>>> >>
>> >>>> > >>>>>>>> >> On Thu, Jan 21, 2021 at 3:37 PM Mickael Maison <
>> >>>> > >>>>>>>> mickael.mai...@gmail.com> wrote:
>> >>>> > >>>>>>>> >>>
>> >>>> > >>>>>>>> >>> Hi Omnia,
>> >>>> > >>>>>>>> >>>
>> >>>> > >>>>>>>> >>> Thanks for the updates. Sorry for the delay but I have a
>> >>>> few
>> >>>> > >>>>>>>> more
>> >>>> > >>>>>>>> >>> small questions about the API:
>> >>>> > >>>>>>>> >>> - Do we really need "isMM2InternalTopic()"? There's
>> >>>> already
>> >>>> > >>>>>>>> >>> "ReplicationPolicy.isInternalTopic()". If so, we need to
>> >>>> > >>>>>>>> explain the
>> >>>> > >>>>>>>> >>> difference between these 2 methods.
>> >>>> > >>>>>>>> >>>
>> >>>> > >>>>>>>> >>> - Is "isCheckpointTopic()" expected to detect all
>> >>>> checkpoint
>> >>>> > >>>>>>>> topics
>> >>>> > >>>>>>>> >>> (for all MM2 instances) or only the ones for this
>> >>>> connector
>> >>>> > >>>>>>>> instance.
>> >>>> > >>>>>>>> >>> If it's the later, I wonder if we could do without the
>> >>>> method.
>> >>>> > >>>>>>>> As this
>> >>>> > >>>>>>>> >>> interface is only called by MM2, we could first call
>> >>>> > >>>>>>>> >>> "checkpointTopic()" and check if that's equal to the
>> >>>> topic
>> >>>> > we're
>> >>>> > >>>>>>>> >>> checking. If it's the former, we don't really know topic
>> >>>> names
>> >>>> > >>>>>>>> other
>> >>>> > >>>>>>>> >>> MM2 instances may be using!
>> >>>> > >>>>>>>> >>>
>> >>>> > >>>>>>>> >>> - The 3 methods returning topic names have different
>> >>>> APIs:
>> >>>> > >>>>>>>> >>> "heartbeatsTopic()" takes no arguments,
>> >>>> "offsetSyncTopic()"
>> >>>> > >>>>>>>> takes the
>> >>>> > >>>>>>>> >>> target cluster alias and "checkpointTopic()" takes
>> >>>> > >>>>>>>> "clusterAlias"
>> >>>> > >>>>>>>> >>> (which one is it? source or target?). As the interface
>> >>>> extends
>> >>>> > >>>>>>>> >>> Configurable, maybe we could get rid of all the
>> >>>> arguments and
>> >>>> > >>>>>>>> use the
>> >>>> > >>>>>>>> >>> config to find the cluster aliases. WDYT?
>> >>>> > >>>>>>>> >>>
>> >>>> > >>>>>>>> >>> These are minor concerns, just making sure I fully
>> >>>> understand
>> >>>> > >>>>>>>> how the
>> >>>> > >>>>>>>> >>> API is expected to be used. Once these are cleared, I'll
>> >>>> be
>> >>>> > >>>>>>>> happy to
>> >>>> > >>>>>>>> >>> vote for this KIP.
>> >>>> > >>>>>>>> >>>
>> >>>> > >>>>>>>> >>> Thanks
>> >>>> > >>>>>>>> >>>
>> >>>> > >>>>>>>> >>> On Fri, Jan 8, 2021 at 12:06 PM Omnia Ibrahim <
>> >>>> > >>>>>>>> o.g.h.ibra...@gmail.com> wrote:
>> >>>> > >>>>>>>> >>> >
>> >>>> > >>>>>>>> >>> > Hi Mickael,
>> >>>> > >>>>>>>> >>> > Did you get time to review the changes to the KIP? If
>> >>>> you
>> >>>> > >>>>>>>> okay with it could you vote for the KIP here ttps://
>> >>>> > >>>>>>>> www.mail-archive.com/dev@kafka.apache.org/msg113575.html?
>> >>>> > >>>>>>>> >>> > Thanks
>> >>>> > >>>>>>>> >>> >
>> >>>> > >>>>>>>> >>> > On Thu, Dec 10, 2020 at 2:19 PM Omnia Ibrahim <
>> >>>> > >>>>>>>> o.g.h.ibra...@gmail.com> wrote:
>> >>>> > >>>>>>>> >>> >>
>> >>>> > >>>>>>>> >>> >> Hi Mickael,
>> >>>> > >>>>>>>> >>> >> 1) That's right the interface and default
>> >>>> implementation
>> >>>> > >>>>>>>> will in mirror-connect
>> >>>> > >>>>>>>> >>> >> 2) Renaming the interface should be fine too
>> >>>> especially if
>> >>>> > >>>>>>>> you planning to move other functionality related to the
>> >>>> creation
>> >>>> > there, I
>> >>>> > >>>>>>>> can edit this
>> >>>> > >>>>>>>> >>> >>
>> >>>> > >>>>>>>> >>> >> if you are okay with that please vote for the KIP here
>> >>>> > >>>>>>>>
>> >>>> https://www.mail-archive.com/dev@kafka.apache.org/msg113575.html
>> >>>> > >>>>>>>> >>> >>
>> >>>> > >>>>>>>> >>> >>
>> >>>> > >>>>>>>> >>> >> Thanks
>> >>>> > >>>>>>>> >>> >> Omnia
>> >>>> > >>>>>>>> >>> >> On Thu, Dec 10, 2020 at 12:58 PM Mickael Maison <
>> >>>> > >>>>>>>> mickael.mai...@gmail.com> wrote:
>> >>>> > >>>>>>>> >>> >>>
>> >>>> > >>>>>>>> >>> >>> Hi Omnia,
>> >>>> > >>>>>>>> >>> >>>
>> >>>> > >>>>>>>> >>> >>> Thank you for the reply, it makes sense.
>> >>>> > >>>>>>>> >>> >>>
>> >>>> > >>>>>>>> >>> >>> A couple more comments:
>> >>>> > >>>>>>>> >>> >>>
>> >>>> > >>>>>>>> >>> >>> 1) I'm assuming the new interface and default
>> >>>> > >>>>>>>> implementation will be
>> >>>> > >>>>>>>> >>> >>> in the mirror-client project? as the names of some of
>> >>>> > these
>> >>>> > >>>>>>>> topics are
>> >>>> > >>>>>>>> >>> >>> needed by RemoteClusterUtils on the client-side.
>> >>>> > >>>>>>>> >>> >>>
>> >>>> > >>>>>>>> >>> >>> 2) I'm about to open a KIP to specify where the
>> >>>> > >>>>>>>> offset-syncs topic is
>> >>>> > >>>>>>>> >>> >>> created by MM2. In restricted environments, we'd
>> >>>> prefer
>> >>>> > MM2
>> >>>> > >>>>>>>> to only
>> >>>> > >>>>>>>> >>> >>> have read access to the source cluster and have the
>> >>>> > >>>>>>>> offset-syncs on
>> >>>> > >>>>>>>> >>> >>> the target cluster. I think allowing to specify the
>> >>>> > cluster
>> >>>> > >>>>>>>> where to
>> >>>> > >>>>>>>> >>> >>> create that topic would be a natural extension of the
>> >>>> > >>>>>>>> interface you
>> >>>> > >>>>>>>> >>> >>> propose here.
>> >>>> > >>>>>>>> >>> >>>
>> >>>> > >>>>>>>> >>> >>> So I wonder if your interface could be named
>> >>>> > >>>>>>>> InternalTopicsPolicy?
>> >>>> > >>>>>>>> >>> >>> That's a bit more generic than
>> >>>> InternalTopicNamingPolicy.
>> >>>> > >>>>>>>> That would
>> >>>> > >>>>>>>> >>> >>> also match the configuration setting,
>> >>>> > >>>>>>>> internal.topics.policy.class,
>> >>>> > >>>>>>>> >>> >>> you're proposing.
>> >>>> > >>>>>>>> >>> >>>
>> >>>> > >>>>>>>> >>> >>> Thanks
>> >>>> > >>>>>>>> >>> >>>
>> >>>> > >>>>>>>> >>> >>> On Thu, Dec 3, 2020 at 10:15 PM Omnia Ibrahim <
>> >>>> > >>>>>>>> o.g.h.ibra...@gmail.com> wrote:
>> >>>> > >>>>>>>> >>> >>> >
>> >>>> > >>>>>>>> >>> >>> > Hi Mickael,
>> >>>> > >>>>>>>> >>> >>> > Thanks for your feedback!
>> >>>> > >>>>>>>> >>> >>> > Regards your question about having more
>> >>>> configurations,
>> >>>> > I
>> >>>> > >>>>>>>> considered adding
>> >>>> > >>>>>>>> >>> >>> > configuration per each topic however this meant
>> >>>> adding
>> >>>> > >>>>>>>> more configurations
>> >>>> > >>>>>>>> >>> >>> > for MM2 which already have so many, also the more
>> >>>> > >>>>>>>> complicated and advanced
>> >>>> > >>>>>>>> >>> >>> > replication pattern you have between clusters the
>> >>>> more
>> >>>> > >>>>>>>> configuration lines
>> >>>> > >>>>>>>> >>> >>> > will be added to your MM2 config which isn't going
>> >>>> to be
>> >>>> > >>>>>>>> pretty if you
>> >>>> > >>>>>>>> >>> >>> > don't have the same topics names across your
>> >>>> clusters.
>> >>>> > >>>>>>>> >>> >>> >
>> >>>> > >>>>>>>> >>> >>> > Also, it added more complexity to the
>> >>>> implementation as
>> >>>> > >>>>>>>> MM2 need to
>> >>>> > >>>>>>>> >>> >>> > 1- identify if a topic is checkpoints so we could
>> >>>> list
>> >>>> > >>>>>>>> the checkpoints
>> >>>> > >>>>>>>> >>> >>> > topics in MirrorMaker 2 utils as one cluster could
>> >>>> have
>> >>>> > X
>> >>>> > >>>>>>>> numbers
>> >>>> > >>>>>>>> >>> >>> > checkpoints topics if it's connected to X
>> >>>> clusters, this
>> >>>> > >>>>>>>> is done right now
>> >>>> > >>>>>>>> >>> >>> > by listing any topic with suffix
>> >>>> > `.checkpoints.internal`.
>> >>>> > >>>>>>>> This could be
>> >>>> > >>>>>>>> >>> >>> > done by add `checkpoints.topic.suffix` config but
>> >>>> this
>> >>>> > >>>>>>>> would make an
>> >>>> > >>>>>>>> >>> >>> > assumption that checkpoints will always have a
>> >>>> suffix
>> >>>> > >>>>>>>> also having a suffix
>> >>>> > >>>>>>>> >>> >>> > means that we may need a separator as well to
>> >>>> > concatenate
>> >>>> > >>>>>>>> this suffix with
>> >>>> > >>>>>>>> >>> >>> > a prefix to identify source cluster name.
>> >>>> > >>>>>>>> >>> >>> > 2- identify if a topic is internal, so it
>> >>>> shouldn't be
>> >>>> > >>>>>>>> replicated or track
>> >>>> > >>>>>>>> >>> >>> > checkpoints for it, right now this is relaying on
>> >>>> > >>>>>>>> disallow topics with
>> >>>> > >>>>>>>> >>> >>> > `.internal` suffix to be not replicated and not
>> >>>> tracked
>> >>>> > >>>>>>>> in checkpoints but
>> >>>> > >>>>>>>> >>> >>> > with making topics configurable we need a way to
>> >>>> define
>> >>>> > >>>>>>>> what is an internal
>> >>>> > >>>>>>>> >>> >>> > topic. This could be done by making using a list
>> >>>> of all
>> >>>> > >>>>>>>> internal topics
>> >>>> > >>>>>>>> >>> >>> > have been entered to the configuration.
>> >>>> > >>>>>>>> >>> >>> >
>> >>>> > >>>>>>>> >>> >>> > So having an interface seemed easier and also give
>> >>>> more
>> >>>> > >>>>>>>> flexibility for
>> >>>> > >>>>>>>> >>> >>> > users to define their own topics name, define what
>> >>>> is
>> >>>> > >>>>>>>> internal topic means,
>> >>>> > >>>>>>>> >>> >>> > how to find checkpoints topics and it will be one
>> >>>> line
>> >>>> > >>>>>>>> config for each
>> >>>> > >>>>>>>> >>> >>> > herder, also it more consistence with MM2 code as
>> >>>> MM2
>> >>>> > >>>>>>>> config have
>> >>>> > >>>>>>>> >>> >>> > TopicFilter, ReplicationPolicy, GroupFilter, etc as
>> >>>> > >>>>>>>> interface and they can
>> >>>> > >>>>>>>> >>> >>> > be overridden by providing a custom implementation
>> >>>> for
>> >>>> > >>>>>>>> them or have some
>> >>>> > >>>>>>>> >>> >>> > config that change their default implementations.
>> >>>> > >>>>>>>> >>> >>> >
>> >>>> > >>>>>>>> >>> >>> > Hope this answer your question. I also updated the
>> >>>> KIP
>> >>>> > to
>> >>>> > >>>>>>>> add this to the
>> >>>> > >>>>>>>> >>> >>> > rejected solutions.
>> >>>> > >>>>>>>> >>> >>> >
>> >>>> > >>>>>>>> >>> >>> >
>> >>>> > >>>>>>>> >>> >>> > On Thu, Dec 3, 2020 at 3:19 PM Mickael Maison <
>> >>>> > >>>>>>>> mickael.mai...@gmail.com>
>> >>>> > >>>>>>>> >>> >>> > wrote:
>> >>>> > >>>>>>>> >>> >>> >
>> >>>> > >>>>>>>> >>> >>> > > Hi Omnia,
>> >>>> > >>>>>>>> >>> >>> > >
>> >>>> > >>>>>>>> >>> >>> > > Thanks for the KIP. Indeed being able to
>> >>>> configure
>> >>>> > >>>>>>>> MM2's internal
>> >>>> > >>>>>>>> >>> >>> > > topic names would be a nice improvement.
>> >>>> > >>>>>>>> >>> >>> > >
>> >>>> > >>>>>>>> >>> >>> > > Looking at the KIP, I was surprised you propose
>> >>>> an
>> >>>> > >>>>>>>> interface to allow
>> >>>> > >>>>>>>> >>> >>> > > users to specify names. Have you considered
>> >>>> making
>> >>>> > >>>>>>>> names changeable
>> >>>> > >>>>>>>> >>> >>> > > via configurations? If so, we should definitely
>> >>>> > mention
>> >>>> > >>>>>>>> it in the
>> >>>> > >>>>>>>> >>> >>> > > rejected alternatives as it's the first method
>> >>>> that
>> >>>> > >>>>>>>> comes to mind.
>> >>>> > >>>>>>>> >>> >>> > >
>> >>>> > >>>>>>>> >>> >>> > > I understand an interface gives a lot of
>> >>>> flexibility
>> >>>> > >>>>>>>> but I'd expect
>> >>>> > >>>>>>>> >>> >>> > > topic names to be relatively simple and known in
>> >>>> > >>>>>>>> advance in most
>> >>>> > >>>>>>>> >>> >>> > > cases.
>> >>>> > >>>>>>>> >>> >>> > >
>> >>>> > >>>>>>>> >>> >>> > > I've not checked all use cases but something like
>> >>>> > below
>> >>>> > >>>>>>>> felt appropriate:
>> >>>> > >>>>>>>> >>> >>> > > clusters = primary,backup
>> >>>> > >>>>>>>> >>> >>> > >
>> >>>> > >>>>>>>> primary->backup.offsets-sync.topic=backup.mytopic-offsets-
>> >>>> sync
>> >>>> > >>>>>>>> >>> >>> > >
>> >>>> > >>>>>>>> >>> >>> > > On Tue, Dec 1, 2020 at 3:36 PM Omnia Ibrahim <
>> >>>> > >>>>>>>> o.g.h.ibra...@gmail.com>
>> >>>> > >>>>>>>> >>> >>> > > wrote:
>> >>>> > >>>>>>>> >>> >>> > > >
>> >>>> > >>>>>>>> >>> >>> > > > Hey everyone,
>> >>>> > >>>>>>>> >>> >>> > > > Please take a look at KIP-690:
>> >>>> > >>>>>>>> >>> >>> > > >
>> >>>> > >>>>>>>> >>> >>> > > >
>> >>>> > >>>>>>>> >>> >>> > >
>> >>>> > >>>>>>>>
>> >>>> >
>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-690%3A+Add+additional+configuration+to+control+MirrorMaker+2+internal+topics+naming+convention
>> >>>> > >>>>>>>> >>> >>> > > >
>> >>>> > >>>>>>>> >>> >>> > > > Thanks for your feedback and support.
>> >>>> > >>>>>>>> >>> >>> > > >
>> >>>> > >>>>>>>> >>> >>> > > > Omnia
>> >>>> > >>>>>>>> >>> >>> > > >
>> >>>> > >>>>>>>> >>> >>> > >
>> >>>> > >>>>>>>>
>> >>>> > >>>>>>>
>> >>>> >
>> >>>>
>> >>>>
>> >>>> --
>> >>>> Gwen Shapira
>> >>>> Engineering Manager | Confluent
>> >>>> 650.450.2760 | @gwenshap
>> >>>> Follow us: Twitter | blog
>> >>>>
>> >>>

Reply via email to