@Gwen, can you please have a second look at the proposal and my explanation for why this approach is better for MM2?
On Thu, Jun 24, 2021 at 5:55 PM Mickael Maison <mickael.mai...@gmail.com> wrote: > Hi Omnia, > > I think the current proposal makes sense. Thanks for driving this feature. > > On Mon, Jun 21, 2021 at 11:51 PM Ryanne Dolan <ryannedo...@gmail.com> > wrote: > > > > Omnia, I agree with you that allowing users to specify the whole topic > name via configuration is likely to create problems. MM2 must distinguish > between internal topics from different clusters, and pushing that > complexity into configuration sounds really complicated. > > > > I like the ReplicationPolicy approach. > > > > Ryanne > > > > On Mon, Jun 21, 2021, 2:04 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> > wrote: > >> > >> Any thoughts on this KIP? > >> > >> > >> On Thu, Jun 17, 2021 at 1:38 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> > >> wrote: > >> > >> > Another reason why I think adding a configuration for each internal > topic is not a good solution is how MM2 is naming these topics at the > moment. > >> > Right now MM2 sets the name of the offset-syncs topic to > mm2-offset-syncs.<cluster-alias>.internal and for checkpoints is > <cluster-alias>.checkpoints.internal so the name has a pattern to link it > back to the herder of source -> target mirror link so having this in > configuration will lead to > >> > 1. having a method that determines the final name of internal topics > for backward compatibility and have this method to be the default of the > configuration values. The challenge here is that we need to load first the > clusters alias to calculate the default value for offset-syncs.topic.name > and checkpoints.topic.name. > >> > 2. Consider use cases where MM2 is used to mirror between multiple > clusters, for example: > >> > source1 -> target.enabled = true > >> > source2 -> target.enabled = true > >> > For this use-case the current behaviour will create the following > offset-syncs and checkpoints on each cluster: > >> > source1 cluster > >> > - mm2-offset-syncs.target.internal > >> > source2 cluster > >> > - mm2-offset-syncs.target.internal > >> > target cluster > >> > - source1.checkpoints.internal > >> > - source2.checkpoints.internal > >> > As MM2 design in the original KIP-382 is spliting internal topics > bsed on mirroring links. Now if we let MM2 users set the full name of these > topics as configuration, how will we detect if the user has a wrong > configuration where they used the same name for checkpoints topic for both > source1 and source2. How this will work if both source1 and source2 > clusters have consumer groups with same ids as checkpoints topic messages > contains consumer group id? Should we warn the MM2 user that this topic has > been used before for another source cluster? If not how will the MM2 user > notice that? > >> > > >> > > >> > On Mon, Jun 14, 2021 at 5:54 PM Omnia Ibrahim < > o.g.h.ibra...@gmail.com> > >> > wrote: > >> > > >> >> Hi folks, let me try to clarify some of your concerns and questions. > >> >> > >> >> Mickael: Have you considered making names changeable via > configurations? > >> >>> > >> >> > >> >> Gwen: may be missing something, but we are looking at 3 new configs > (one > >> >>> for each topic). And this rejected alternative is basically > identical to > >> >>> what Connect already does (you can choose names for internal topics > using > >> >>> configs). > >> >>> > >> >>> These are valid points. The reasons why we should prefer an > interface > >> >> (the current proposal is using the ReplicationPolicy interface which > >> >> already exists in MM2) instead are > >> >> > >> >> 1. the number of configurations that MM2 has. Right now MM2 has its > own > >> >> set of configuration in addition to configuration for admin, > consumer and > >> >> producer clients and Connect API. And these configurations in some > >> >> use-cases could be different based on the herder. > >> >> > >> >> Consider a use case where MM2 is used to mirror between a set of > clusters > >> >> running by different teams and have different naming policies. So if > we are > >> >> using 3 configurations for internal topics for a use case like below > the > >> >> configuration will be like this. If the number of policies grows, the > >> >> amount of configuration can get unwieldy. > >> >> > >> >> clusters = newCenterCluster, teamACluster, teamBCluster, ... > >> >> > >> >> //newCenterCluster policy is <teamName>.<topicName> > >> >> //teamACluster naming policy is <app>_<topicName> when move to > newCenterCluster it will be teamA.<app>_<topicName> > >> >> //teamBCluster naming policy is <domain>.<topicName> when move to > newCenterCluster it will be teamB.<domain>_<topicName> > >> >> > >> >> //The goal is to move all topics from team-specific cluster to one > new cluster > >> >> // where the org can unify resource management and naming conventions > >> >> > >> >> replication.policy.class=MyCustomReplicationPolicy > >> >> > >> >> teamACluster.heartbeat.topic=mm2_heartbeat_topic // created on > source cluster > >> >> > teamACluster->newCenterCluster.offsets-sync.topic=mm2_my_offset_sync_topic > //created on source cluster at the moment > >> >> > teamACluster->newCenterCluster.checkpoints.topic=teamA.mm2_checkpoint_topic > //created on target cluster which newCenterCluster > >> >> > >> >> teamBCluster.heartbeat.topic=mm2.heartbeat_topic // created on > source cluster > >> >> > teamBCluster->newCenterCluster.offsets-sync.topic=mm2.my_offset_sync_topic > //created on source cluster at the moment > >> >> > teamBCluster->newCenterCluster.checkpoints.topic=teamB.mm2_checkpoint_topic > //created on target cluster which newCenterCluster > >> >> > >> >> teamACluster.config.storage.topic=... > >> >> teamACluster.offset.storage.topic=... > >> >> teamACluster.status.storage.topic=... > >> >> > >> >> teamBCluster.config.storage.topic=... > >> >> teamBCluster.offset.storage.topic=... > >> >> teamBCluster.status.storage.topic=... > >> >> > >> >> > >> >> teamACluster→newCenterCluster.enabled=true > >> >> teamACluster→newCenterCluster.enabled=true > >> >> > >> >> 2. The other reason is what Mickael mentioned regards a future KIP to > >> >> move offset-syncs on the target cluster. > >> >> > >> >>> Mickael: I'm about to open a KIP to specify where the offset-syncs > topic > >> >>> is created by MM2. In restricted environments, we'd prefer MM2 to > only have > >> >>> read access to the source cluster and have the offset-syncs on the > target > >> >>> cluster. I think allowing to specify the cluster where to create > that topic > >> >>> would be a natural extension of the interface you propose here. > >> >>> > >> >> > >> >> In this case, where you want to achieve “read-only” on the source > cluster > >> >> then using ReplicationPolicy to name the offset-syncs topic makes > more > >> >> sense as ReplicationPolicy holds the implementation of how topics > will be > >> >> named on the target cluster where MM2 will have “write” access. > >> >> For example, the user can provide their own naming convention for the > >> >> target cluster as part of replication.policy.class = > >> >> MyCustomReplicationPolicy where it formate the name of the replicated > >> >> topic to be <something>.<topic>.<something>.Now if we have also an > extra > >> >> config for internal topics that will also be created at target with > the > >> >> similar naming convention <something>.<internal_topic>.<something> > this > >> >> means the user will need to add > >> >> offset-sync.topic=<something>.<offset_topic_name>.<something> this > feels > >> >> like a duplication for me as we could achieve it by re-using the > logic from > >> >> ReplicationPolicy > >> >> > >> >> So using the replication interface we can define > >> >> MyCustomReplicationPolicy like this following > >> >> > >> >> public class *MyCustomReplicationPolicy* implements > ReplicationPolicy { > >> >> @Override > >> >> //How to rename remote topics > >> >> public String *formatRemoteTopic*(String sourceClusterAlias, > String topic) { > >> >> return nameTopicOnTarger(sourceClusterAlias, topic, > "mirrored"); > >> >> } > >> >> > >> >> @Override > >> >> String *offsetSyncTopic*(String clusterAlias) { > >> >> // offset-sync topic will be created on target cluster so it > need to follow > >> >> // naming convention of target cluster > >> >> return nameTopicOnTarget(clusterAlias, "mm2-offset-sync", > "internal"); > >> >> } > >> >> > >> >> @Override > >> >> String *checkpointsTopic*(String clusterAlias) { > >> >> // checkpoints topic is created on target cluster so it need > to follow > >> >> // naming convention of target cluster > >> >> return nameTopicOnTarget(clusterAlias, "mm2-checkpoints", > "internal"); > >> >> } > >> >> > >> >> private String *nameTopicOnTarget*(String prefix, String topic, > String suffix) { > >> >> return prefix + separator + topic + seprator + suffix; > >> >> } > >> >> } > >> >> > >> >> and MM2 configs will be > >> >> > >> >> *newCenterCluster.replication.policy.class=MyCustomReplicationPolicy* > >> >> > >> >> Mickael: I understand an interface gives a lot of flexibility but I'd > >> >>> expect topic names to be relatively simple and known in advance in > most > >> >>> cases. > >> >>> > >> >> > >> >> Gwen: How is it too complex? > >> >>> > >> >> > >> >> The reason I opted for this solution wasn't about complexity but > rather > >> >> the number of configurations that MM2 already has. In addition to > the point > >> >> above regarding Mickael's future KIP to move offset-sync topic target > >> >> cluster. > >> >> > >> >> > >> >> Gwen: There is a discussion about how this gets more complicated in > some > >> >>> scenarios, but it was a bit abstract - is there a concrete case > that shows > >> >>> why the configuration is more complicated than implementing a > plugin? > >> >>> > >> >> > >> >> The use case I have in mind is something like my example above. Also > >> >> considering that Mickael is planning to open a KIP to restrict the > access > >> >> to the source cluster and have the offset mapping topics on the > target > >> >> cluster this means using the ReplicationPolicy that handle naming > policies > >> >> at the target cluster makes it easier because if the target cluster > has a > >> >> restricted naming convention it will be applied on all topics > created there. > >> >> > >> >> Another consideration is a future KIP I’ll be raising soon to > abstract > >> >> how MM2 creates topics on target cluster, say for example to > integrate with > >> >> a centralized resource management system. A custom ReplicationPolicy > will > >> >> give users more flexibility in this kind of integration and having to > >> >> manage what can potentially be a lot less configuration. > >> >> > >> >> > >> >> I’m not strongly attached to using an interface as the solution for > >> >> naming internal topics however, I feel it will give us more > flexibility > >> >> with future KIPs. > >> >> > >> >> I would appreciate getting feedback from you both. And if the > majority > >> >> feels like the configurations are easier I can change the proposal > to this. > >> >> > >> >> -- > >> >> Omnia > >> >> > >> >> > >> >> On Sat, May 15, 2021 at 5:59 PM Omnia Ibrahim < > o.g.h.ibra...@gmail.com> > >> >> wrote: > >> >> > >> >>> Hey Gwen, > >> >>> Thanks for having a look into the KIP, regard your question > >> >>> > >> >>>> is there a concrete case that shows why configuration is more > >> >>>> complicated than implementing a plugin? > >> >>>> > >> >>> > >> >>> The case I had in mind is where an organisation with medium-to-large > >> >>> size have multiple teams running their own Kafka clusters without a > >> >>> centralized team that runs Kafka and enforce the same rule for > naming > >> >>> resources everywhere. > >> >>> In this environment, if the organization for example is trying to > use > >> >>> MM2 to migrate all their data into one place or the data is needed > on both > >> >>> clusters for any business use cases (like needs aggregate data from > >> >>> different clusters), adding extra config for each internal topic > will > >> >>> increase the amount of configuration need to run MM2 for this case. > >> >>> > >> >>> The other concerns I have in general are > >> >>> > >> >>> 1. The popular usage of MM2 is replicating topics between clusters > >> >>> running by the same team, in this case, it's most likely that if > this team > >> >>> has a naming topic's rule, this same rule will be applied to both > >> >>> replicated and some of the internal topics of MM2. If we added > config > >> >>> like connect to each internal topic then these customers will end > up adding > >> >>> 4 configs just to handle the same naming rule, 1 to include > customised > >> >>> replication policy for naming replicated topics + extra 3 configs > for the > >> >>> internals to match the same rule. > >> >>> > >> >>> 2. The replication policy plugin already implemented in the MM2, and > >> >>> many customers have already their own customised implementation > (there're > >> >>> few implementations flying around already for this policy in the > community) > >> >>> so we aren't adding extra configuration instead we are expanding the > >> >>> responsibility of the policy. > >> >>> > >> >>> Am happy to make it align with connect configs if others disagree > with > >> >>> my concerns, at the end my number one goal is to make it flexible > to name > >> >>> these topics. I opt-in for an interface based solution so I can > minimize > >> >>> the number of config customers need to add. > >> >>> > >> >>> Omnia > >> >>> > >> >>> > >> >>> On Thu, May 13, 2021 at 9:30 PM Gwen Shapira > <g...@confluent.io.invalid> > >> >>> wrote: > >> >>> > >> >>>> Hey, sorry for arriving late, but I have a question about the > rejected > >> >>>> alternative involving configuration. > >> >>>> > >> >>>> I may be missing something, but we are looking at 3 new configs > (one for > >> >>>> each topic). And this rejected alternative is basically identical > to > >> >>>> what > >> >>>> Connect already does (you can choose names for internal topics > using > >> >>>> configs). How is it too complex? As a user, configuring 3 fields > seems > >> >>>> much > >> >>>> simpler than implementing a class. As an admin, trusting to run > >> >>>> someone's > >> >>>> code is scary, but a config with topic name is very safe and easy > to > >> >>>> test > >> >>>> and trust. > >> >>>> > >> >>>> There is a discussion about how this gets more complicated in some > >> >>>> scenarios, but it was a bit abstract - is there a concrete case > that > >> >>>> shows > >> >>>> why configuration is more complicated than implementing a plugin? > >> >>>> > >> >>>> Gwen > >> >>>> > >> >>>> On Fri, Apr 30, 2021 at 2:30 AM Omnia Ibrahim < > o.g.h.ibra...@gmail.com> > >> >>>> wrote: > >> >>>> > >> >>>> > Hi Ryanne, Can you vote for it here > >> >>>> > https://www.mail-archive.com/dev@kafka.apache.org/msg113575.html > as > >> >>>> well, > >> >>>> > please? > >> >>>> > > >> >>>> > On Fri, Apr 30, 2021 at 12:44 AM Ryanne Dolan < > ryannedo...@gmail.com> > >> >>>> > wrote: > >> >>>> > > >> >>>> > > Thanks Omnia. lgtm! > >> >>>> > > > >> >>>> > > Ryanne > >> >>>> > > > >> >>>> > > On Thu, Apr 29, 2021 at 10:50 AM Omnia Ibrahim < > >> >>>> o.g.h.ibra...@gmail.com> > >> >>>> > > wrote: > >> >>>> > > > >> >>>> > >> I updated the KIP > >> >>>> > >> > >> >>>> > >> On Thu, Apr 29, 2021 at 4:43 PM Omnia Ibrahim < > >> >>>> o.g.h.ibra...@gmail.com> > >> >>>> > >> wrote: > >> >>>> > >> > >> >>>> > >>> Sure, this would make it easier, we can make these functions > >> >>>> returns > >> >>>> > the > >> >>>> > >>> original behaviour (<clusterAlias>.checkpoints.internal, > >> >>>> > >>> "mm2-offset-syncs.<clusterAlias>.internal", heartbeat) > without > >> >>>> any > >> >>>> > >>> customisation using `replication.policy.separator` and use > the > >> >>>> > separator in > >> >>>> > >>> the DefaultReplicationPolicy > >> >>>> > >>> > >> >>>> > >>> On Wed, Apr 28, 2021 at 1:31 AM Ryanne Dolan < > >> >>>> ryannedo...@gmail.com> > >> >>>> > >>> wrote: > >> >>>> > >>> > >> >>>> > >>>> Thanks Omnia, makes sense to me. > >> >>>> > >>>> > >> >>>> > >>>> > Customers who have their customised ReplicationPolicy will > >> >>>> need to > >> >>>> > >>>> add the definition of their internal topics naming > convention > >> >>>> > >>>> > >> >>>> > >>>> I wonder should we include default impls in the interface to > >> >>>> avoid > >> >>>> > that > >> >>>> > >>>> requirement? > >> >>>> > >>>> > >> >>>> > >>>> Ryanne > >> >>>> > >>>> > >> >>>> > >>>> On Sun, Apr 25, 2021, 2:20 PM Omnia Ibrahim < > >> >>>> o.g.h.ibra...@gmail.com> > >> >>>> > >>>> wrote: > >> >>>> > >>>> > >> >>>> > >>>>> Hi Mickael and Ryanne, > >> >>>> > >>>>> I updated the KIP to add these methods to the > ReplicationPolicy > >> >>>> > >>>>> instead of an extra interface to simplify the changes. > Please > >> >>>> have a > >> >>>> > look > >> >>>> > >>>>> and let me know your thoughts. > >> >>>> > >>>>> > >> >>>> > >>>>> Thanks > >> >>>> > >>>>> > >> >>>> > >>>>> On Tue, Mar 30, 2021 at 7:19 PM Omnia Ibrahim < > >> >>>> > o.g.h.ibra...@gmail.com> > >> >>>> > >>>>> wrote: > >> >>>> > >>>>> > >> >>>> > >>>>>> *(sorry forgot to Replay to All) * > >> >>>> > >>>>>> Hi Ryanne, > >> >>>> > >>>>>> It's a valid concern, I was trying to separate the > concerns of > >> >>>> > >>>>>> internal and replicated policy away from each other and to > >> >>>> make the > >> >>>> > code > >> >>>> > >>>>>> readable as extending ReplicationPolicy to manage both > >> >>>> internal and > >> >>>> > >>>>>> replicated topic is a bit odd. Am not against simplifying > >> >>>> things > >> >>>> > out to > >> >>>> > >>>>>> make ReplicationPolicy handling both at the end of the > day if > >> >>>> an > >> >>>> > MM2 user > >> >>>> > >>>>>> has a special naming convention for topics it will be > >> >>>> affecting both > >> >>>> > >>>>>> replicated and MM2 internal topics. > >> >>>> > >>>>>> > >> >>>> > >>>>>> For simplifying things we can extend `ReplicationPolicy` > to the > >> >>>> > >>>>>> following instead of adding an extra class > >> >>>> > >>>>>> > >> >>>> > >>>>>>> *public interface ReplicationPolicy {* > >> >>>> > >>>>>>> String topicSource(String topic); > >> >>>> > >>>>>>> String upstreamTopic(String topic); > >> >>>> > >>>>>>> > >> >>>> > >>>>>>> > >> >>>> > >>>>>>> */** Returns heartbeats topic name.*/ String > >> >>>> heartbeatsTopic();* > >> >>>> > >>>>>>> > >> >>>> > >>>>>>> > >> >>>> > >>>>>>> > >> >>>> > >>>>>>> > >> >>>> > >>>>>>> > >> >>>> > >>>>>>> * /** Returns the offset-syncs topic for given cluster > >> >>>> alias. */ > >> >>>> > >>>>>>> String offsetSyncTopic(String targetAlias); /** > Returns > >> >>>> the > >> >>>> > name > >> >>>> > >>>>>>> checkpoint topic for given cluster alias. */ String > >> >>>> > >>>>>>> checkpointTopic(String sourceAlias); * > >> >>>> > >>>>>>> > >> >>>> > >>>>>>> default String originalTopic(String topic) { > >> >>>> > >>>>>>> String upstream = upstreamTopic(topic); > >> >>>> > >>>>>>> if (upstream == null) { > >> >>>> > >>>>>>> return topic; > >> >>>> > >>>>>>> } else { > >> >>>> > >>>>>>> return originalTopic(upstream); > >> >>>> > >>>>>>> } > >> >>>> > >>>>>>> } > >> >>>> > >>>>>>> > >> >>>> > >>>>>>> > >> >>>> > >>>>>>> * /** Internal topics are never replicated. */ > >> >>>> > >>>>>>> isInternalTopic(String topic) *//the implementaion will > be > >> >>>> moved to > >> >>>> > >>>>>>> `DefaultReplicationPolicy` to handle both kafka topics > and MM2 > >> >>>> > internal > >> >>>> > >>>>>>> topics. > >> >>>> > >>>>>>> } > >> >>>> > >>>>>>> > >> >>>> > >>>>>> > >> >>>> > >>>>>> On Fri, Mar 26, 2021 at 3:05 PM Ryanne Dolan < > >> >>>> ryannedo...@gmail.com > >> >>>> > > > >> >>>> > >>>>>> wrote: > >> >>>> > >>>>>> > >> >>>> > >>>>>>> Omnia, have we considered just adding methods to > >> >>>> ReplicationPolicy? > >> >>>> > >>>>>>> I'm reluctant to add a new class because, as Mickael > points > >> >>>> out, > >> >>>> > we'd need > >> >>>> > >>>>>>> to carry it around in client code. > >> >>>> > >>>>>>> > >> >>>> > >>>>>>> Ryanne > >> >>>> > >>>>>>> > >> >>>> > >>>>>>> On Fri, Feb 19, 2021 at 8:31 AM Mickael Maison < > >> >>>> > >>>>>>> mickael.mai...@gmail.com> wrote: > >> >>>> > >>>>>>> > >> >>>> > >>>>>>>> Hi Omnia, > >> >>>> > >>>>>>>> > >> >>>> > >>>>>>>> Thanks for the clarifications. > >> >>>> > >>>>>>>> > >> >>>> > >>>>>>>> - I'm still a bit uneasy with the overlap between these > 2 > >> >>>> methods > >> >>>> > as > >> >>>> > >>>>>>>> currently `ReplicationPolicy.isInternalTopic` already > >> >>>> handles MM2 > >> >>>> > >>>>>>>> internal topics. Should we make it only handle Kafka > internal > >> >>>> > topics > >> >>>> > >>>>>>>> and `isMM2InternalTopic()` only handle MM2 topics? > >> >>>> > >>>>>>>> > >> >>>> > >>>>>>>> - I'm not sure I understand what this method is used > for. > >> >>>> There > >> >>>> > are > >> >>>> > >>>>>>>> no > >> >>>> > >>>>>>>> such methods for the other 2 topics (offset-sync and > >> >>>> heartbeat). > >> >>>> > >>>>>>>> Also > >> >>>> > >>>>>>>> what happens if there are other MM2 instances using > different > >> >>>> > naming > >> >>>> > >>>>>>>> schemes in the same cluster. Do all instances have to > know > >> >>>> about > >> >>>> > the > >> >>>> > >>>>>>>> other naming schemes? What are the expected issues if > they > >> >>>> don't? > >> >>>> > >>>>>>>> > >> >>>> > >>>>>>>> - RemoteClusterUtils is a client-side utility so it > does not > >> >>>> have > >> >>>> > >>>>>>>> access to the MM2 configuration. Since this new API can > >> >>>> affect the > >> >>>> > >>>>>>>> name of the checkpoint topic, it will need to be used > >> >>>> client-side > >> >>>> > >>>>>>>> too > >> >>>> > >>>>>>>> so users can find the checkpoint topic name. I had to > >> >>>> realized > >> >>>> > this > >> >>>> > >>>>>>>> was the case. > >> >>>> > >>>>>>>> > >> >>>> > >>>>>>>> Thanks > >> >>>> > >>>>>>>> > >> >>>> > >>>>>>>> On Mon, Feb 15, 2021 at 9:33 PM Omnia Ibrahim < > >> >>>> > >>>>>>>> o.g.h.ibra...@gmail.com> wrote: > >> >>>> > >>>>>>>> > > >> >>>> > >>>>>>>> > Hi Mickael, did you have some time to check my answer? > >> >>>> > >>>>>>>> > > >> >>>> > >>>>>>>> > On Thu, Jan 21, 2021 at 10:10 PM Omnia Ibrahim < > >> >>>> > >>>>>>>> o.g.h.ibra...@gmail.com> wrote: > >> >>>> > >>>>>>>> >> > >> >>>> > >>>>>>>> >> Hi Mickael, > >> >>>> > >>>>>>>> >> Thanks for taking another look into the KIP, regards > your > >> >>>> > >>>>>>>> questions > >> >>>> > >>>>>>>> >> > >> >>>> > >>>>>>>> >> - I believe we need both "isMM2InternalTopic" and > >> >>>> > >>>>>>>> `ReplicationPolicy.isInternalTopic` as > >> >>>> > `ReplicationPolicy.isInternalTopic` > >> >>>> > >>>>>>>> does check if a topic is Kafka internal topic, while > >> >>>> > `isMM2InternalTopic` > >> >>>> > >>>>>>>> is just focusing if a topic is MM2 internal topic or > >> >>>> not(which is > >> >>>> > >>>>>>>> heartbeat/checkpoint/offset-sync). The fact that the > >> >>>> default for > >> >>>> > MM2 > >> >>>> > >>>>>>>> internal topics matches > "ReplicationPolicy.isInternalTopic" > >> >>>> will > >> >>>> > not be an > >> >>>> > >>>>>>>> accurate assumption anymore once we implement this KIP. > >> >>>> > >>>>>>>> >> > >> >>>> > >>>>>>>> >> - "isCheckpointTopic" will detect all checkpoint > topics > >> >>>> for all > >> >>>> > >>>>>>>> MM2 instances this is needed for "MirrorClient. > >> >>>> checkpointTopics" > >> >>>> > which > >> >>>> > >>>>>>>> originally check if the topic name ends with > >> >>>> > CHECKPOINTS_TOPIC_SUFFIX. So > >> >>>> > >>>>>>>> this method just to keep the same functionality that > >> >>>> originally > >> >>>> > exists in > >> >>>> > >>>>>>>> MM2 > >> >>>> > >>>>>>>> >> > >> >>>> > >>>>>>>> >> - "checkpointTopic" is used in two places 1. At topic > >> >>>> creation > >> >>>> > >>>>>>>> in "MirrorCheckpointConnector.createInternalTopics" > which > >> >>>> use > >> >>>> > >>>>>>>> "sourceClusterAlias() + CHECKPOINTS_TOPIC_SUFFIX" and > 2. At > >> >>>> > >>>>>>>> "MirrorClient.remoteConsumerOffsets" which is called by > >> >>>> > >>>>>>>> "RemoteClusterUtils.translateOffsets" the cluster alias > >> >>>> here > >> >>>> > referred to > >> >>>> > >>>>>>>> as "remoteCluster" where the topic name is > >> >>>> "remoteClusterAlias + > >> >>>> > >>>>>>>> CHECKPOINTS_TOPIC_SUFFIX" (which is an argument in > >> >>>> > RemoteClusterUtils, not > >> >>>> > >>>>>>>> a config) This why I called the variable cluster > instead of > >> >>>> > source and > >> >>>> > >>>>>>>> instead of using the config to figure out the cluster > aliases > >> >>>> > from config > >> >>>> > >>>>>>>> as we use checkpoints to keep `RemoteClusterUtils` > >> >>>> compatible for > >> >>>> > existing > >> >>>> > >>>>>>>> users. I see a benefit of just read the config a find > out the > >> >>>> > cluster > >> >>>> > >>>>>>>> aliases but on the other side, I'm not sure why > >> >>>> > "RemoteClusterUtils" > >> >>>> > >>>>>>>> doesn't get the name of the cluster from the properties > >> >>>> instead > >> >>>> > of an > >> >>>> > >>>>>>>> argument, so I decided to keep it just for > compatibility. > >> >>>> > >>>>>>>> >> > >> >>>> > >>>>>>>> >> Hope these answer some of your concerns. > >> >>>> > >>>>>>>> >> Best > >> >>>> > >>>>>>>> >> Omnia > >> >>>> > >>>>>>>> >> > >> >>>> > >>>>>>>> >> On Thu, Jan 21, 2021 at 3:37 PM Mickael Maison < > >> >>>> > >>>>>>>> mickael.mai...@gmail.com> wrote: > >> >>>> > >>>>>>>> >>> > >> >>>> > >>>>>>>> >>> Hi Omnia, > >> >>>> > >>>>>>>> >>> > >> >>>> > >>>>>>>> >>> Thanks for the updates. Sorry for the delay but I > have a > >> >>>> few > >> >>>> > >>>>>>>> more > >> >>>> > >>>>>>>> >>> small questions about the API: > >> >>>> > >>>>>>>> >>> - Do we really need "isMM2InternalTopic()"? There's > >> >>>> already > >> >>>> > >>>>>>>> >>> "ReplicationPolicy.isInternalTopic()". If so, we > need to > >> >>>> > >>>>>>>> explain the > >> >>>> > >>>>>>>> >>> difference between these 2 methods. > >> >>>> > >>>>>>>> >>> > >> >>>> > >>>>>>>> >>> - Is "isCheckpointTopic()" expected to detect all > >> >>>> checkpoint > >> >>>> > >>>>>>>> topics > >> >>>> > >>>>>>>> >>> (for all MM2 instances) or only the ones for this > >> >>>> connector > >> >>>> > >>>>>>>> instance. > >> >>>> > >>>>>>>> >>> If it's the later, I wonder if we could do without > the > >> >>>> method. > >> >>>> > >>>>>>>> As this > >> >>>> > >>>>>>>> >>> interface is only called by MM2, we could first call > >> >>>> > >>>>>>>> >>> "checkpointTopic()" and check if that's equal to the > >> >>>> topic > >> >>>> > we're > >> >>>> > >>>>>>>> >>> checking. If it's the former, we don't really know > topic > >> >>>> names > >> >>>> > >>>>>>>> other > >> >>>> > >>>>>>>> >>> MM2 instances may be using! > >> >>>> > >>>>>>>> >>> > >> >>>> > >>>>>>>> >>> - The 3 methods returning topic names have different > >> >>>> APIs: > >> >>>> > >>>>>>>> >>> "heartbeatsTopic()" takes no arguments, > >> >>>> "offsetSyncTopic()" > >> >>>> > >>>>>>>> takes the > >> >>>> > >>>>>>>> >>> target cluster alias and "checkpointTopic()" takes > >> >>>> > >>>>>>>> "clusterAlias" > >> >>>> > >>>>>>>> >>> (which one is it? source or target?). As the > interface > >> >>>> extends > >> >>>> > >>>>>>>> >>> Configurable, maybe we could get rid of all the > >> >>>> arguments and > >> >>>> > >>>>>>>> use the > >> >>>> > >>>>>>>> >>> config to find the cluster aliases. WDYT? > >> >>>> > >>>>>>>> >>> > >> >>>> > >>>>>>>> >>> These are minor concerns, just making sure I fully > >> >>>> understand > >> >>>> > >>>>>>>> how the > >> >>>> > >>>>>>>> >>> API is expected to be used. Once these are cleared, > I'll > >> >>>> be > >> >>>> > >>>>>>>> happy to > >> >>>> > >>>>>>>> >>> vote for this KIP. > >> >>>> > >>>>>>>> >>> > >> >>>> > >>>>>>>> >>> Thanks > >> >>>> > >>>>>>>> >>> > >> >>>> > >>>>>>>> >>> On Fri, Jan 8, 2021 at 12:06 PM Omnia Ibrahim < > >> >>>> > >>>>>>>> o.g.h.ibra...@gmail.com> wrote: > >> >>>> > >>>>>>>> >>> > > >> >>>> > >>>>>>>> >>> > Hi Mickael, > >> >>>> > >>>>>>>> >>> > Did you get time to review the changes to the > KIP? If > >> >>>> you > >> >>>> > >>>>>>>> okay with it could you vote for the KIP here ttps:// > >> >>>> > >>>>>>>> > www.mail-archive.com/dev@kafka.apache.org/msg113575.html? > >> >>>> > >>>>>>>> >>> > Thanks > >> >>>> > >>>>>>>> >>> > > >> >>>> > >>>>>>>> >>> > On Thu, Dec 10, 2020 at 2:19 PM Omnia Ibrahim < > >> >>>> > >>>>>>>> o.g.h.ibra...@gmail.com> wrote: > >> >>>> > >>>>>>>> >>> >> > >> >>>> > >>>>>>>> >>> >> Hi Mickael, > >> >>>> > >>>>>>>> >>> >> 1) That's right the interface and default > >> >>>> implementation > >> >>>> > >>>>>>>> will in mirror-connect > >> >>>> > >>>>>>>> >>> >> 2) Renaming the interface should be fine too > >> >>>> especially if > >> >>>> > >>>>>>>> you planning to move other functionality related to the > >> >>>> creation > >> >>>> > there, I > >> >>>> > >>>>>>>> can edit this > >> >>>> > >>>>>>>> >>> >> > >> >>>> > >>>>>>>> >>> >> if you are okay with that please vote for the > KIP here > >> >>>> > >>>>>>>> > >> >>>> https://www.mail-archive.com/dev@kafka.apache.org/msg113575.html > >> >>>> > >>>>>>>> >>> >> > >> >>>> > >>>>>>>> >>> >> > >> >>>> > >>>>>>>> >>> >> Thanks > >> >>>> > >>>>>>>> >>> >> Omnia > >> >>>> > >>>>>>>> >>> >> On Thu, Dec 10, 2020 at 12:58 PM Mickael Maison < > >> >>>> > >>>>>>>> mickael.mai...@gmail.com> wrote: > >> >>>> > >>>>>>>> >>> >>> > >> >>>> > >>>>>>>> >>> >>> Hi Omnia, > >> >>>> > >>>>>>>> >>> >>> > >> >>>> > >>>>>>>> >>> >>> Thank you for the reply, it makes sense. > >> >>>> > >>>>>>>> >>> >>> > >> >>>> > >>>>>>>> >>> >>> A couple more comments: > >> >>>> > >>>>>>>> >>> >>> > >> >>>> > >>>>>>>> >>> >>> 1) I'm assuming the new interface and default > >> >>>> > >>>>>>>> implementation will be > >> >>>> > >>>>>>>> >>> >>> in the mirror-client project? as the names of > some of > >> >>>> > these > >> >>>> > >>>>>>>> topics are > >> >>>> > >>>>>>>> >>> >>> needed by RemoteClusterUtils on the client-side. > >> >>>> > >>>>>>>> >>> >>> > >> >>>> > >>>>>>>> >>> >>> 2) I'm about to open a KIP to specify where the > >> >>>> > >>>>>>>> offset-syncs topic is > >> >>>> > >>>>>>>> >>> >>> created by MM2. In restricted environments, we'd > >> >>>> prefer > >> >>>> > MM2 > >> >>>> > >>>>>>>> to only > >> >>>> > >>>>>>>> >>> >>> have read access to the source cluster and have > the > >> >>>> > >>>>>>>> offset-syncs on > >> >>>> > >>>>>>>> >>> >>> the target cluster. I think allowing to specify > the > >> >>>> > cluster > >> >>>> > >>>>>>>> where to > >> >>>> > >>>>>>>> >>> >>> create that topic would be a natural extension > of the > >> >>>> > >>>>>>>> interface you > >> >>>> > >>>>>>>> >>> >>> propose here. > >> >>>> > >>>>>>>> >>> >>> > >> >>>> > >>>>>>>> >>> >>> So I wonder if your interface could be named > >> >>>> > >>>>>>>> InternalTopicsPolicy? > >> >>>> > >>>>>>>> >>> >>> That's a bit more generic than > >> >>>> InternalTopicNamingPolicy. > >> >>>> > >>>>>>>> That would > >> >>>> > >>>>>>>> >>> >>> also match the configuration setting, > >> >>>> > >>>>>>>> internal.topics.policy.class, > >> >>>> > >>>>>>>> >>> >>> you're proposing. > >> >>>> > >>>>>>>> >>> >>> > >> >>>> > >>>>>>>> >>> >>> Thanks > >> >>>> > >>>>>>>> >>> >>> > >> >>>> > >>>>>>>> >>> >>> On Thu, Dec 3, 2020 at 10:15 PM Omnia Ibrahim < > >> >>>> > >>>>>>>> o.g.h.ibra...@gmail.com> wrote: > >> >>>> > >>>>>>>> >>> >>> > > >> >>>> > >>>>>>>> >>> >>> > Hi Mickael, > >> >>>> > >>>>>>>> >>> >>> > Thanks for your feedback! > >> >>>> > >>>>>>>> >>> >>> > Regards your question about having more > >> >>>> configurations, > >> >>>> > I > >> >>>> > >>>>>>>> considered adding > >> >>>> > >>>>>>>> >>> >>> > configuration per each topic however this > meant > >> >>>> adding > >> >>>> > >>>>>>>> more configurations > >> >>>> > >>>>>>>> >>> >>> > for MM2 which already have so many, also the > more > >> >>>> > >>>>>>>> complicated and advanced > >> >>>> > >>>>>>>> >>> >>> > replication pattern you have between clusters > the > >> >>>> more > >> >>>> > >>>>>>>> configuration lines > >> >>>> > >>>>>>>> >>> >>> > will be added to your MM2 config which isn't > going > >> >>>> to be > >> >>>> > >>>>>>>> pretty if you > >> >>>> > >>>>>>>> >>> >>> > don't have the same topics names across your > >> >>>> clusters. > >> >>>> > >>>>>>>> >>> >>> > > >> >>>> > >>>>>>>> >>> >>> > Also, it added more complexity to the > >> >>>> implementation as > >> >>>> > >>>>>>>> MM2 need to > >> >>>> > >>>>>>>> >>> >>> > 1- identify if a topic is checkpoints so we > could > >> >>>> list > >> >>>> > >>>>>>>> the checkpoints > >> >>>> > >>>>>>>> >>> >>> > topics in MirrorMaker 2 utils as one cluster > could > >> >>>> have > >> >>>> > X > >> >>>> > >>>>>>>> numbers > >> >>>> > >>>>>>>> >>> >>> > checkpoints topics if it's connected to X > >> >>>> clusters, this > >> >>>> > >>>>>>>> is done right now > >> >>>> > >>>>>>>> >>> >>> > by listing any topic with suffix > >> >>>> > `.checkpoints.internal`. > >> >>>> > >>>>>>>> This could be > >> >>>> > >>>>>>>> >>> >>> > done by add `checkpoints.topic.suffix` config > but > >> >>>> this > >> >>>> > >>>>>>>> would make an > >> >>>> > >>>>>>>> >>> >>> > assumption that checkpoints will always have a > >> >>>> suffix > >> >>>> > >>>>>>>> also having a suffix > >> >>>> > >>>>>>>> >>> >>> > means that we may need a separator as well to > >> >>>> > concatenate > >> >>>> > >>>>>>>> this suffix with > >> >>>> > >>>>>>>> >>> >>> > a prefix to identify source cluster name. > >> >>>> > >>>>>>>> >>> >>> > 2- identify if a topic is internal, so it > >> >>>> shouldn't be > >> >>>> > >>>>>>>> replicated or track > >> >>>> > >>>>>>>> >>> >>> > checkpoints for it, right now this is > relaying on > >> >>>> > >>>>>>>> disallow topics with > >> >>>> > >>>>>>>> >>> >>> > `.internal` suffix to be not replicated and > not > >> >>>> tracked > >> >>>> > >>>>>>>> in checkpoints but > >> >>>> > >>>>>>>> >>> >>> > with making topics configurable we need a way > to > >> >>>> define > >> >>>> > >>>>>>>> what is an internal > >> >>>> > >>>>>>>> >>> >>> > topic. This could be done by making using a > list > >> >>>> of all > >> >>>> > >>>>>>>> internal topics > >> >>>> > >>>>>>>> >>> >>> > have been entered to the configuration. > >> >>>> > >>>>>>>> >>> >>> > > >> >>>> > >>>>>>>> >>> >>> > So having an interface seemed easier and also > give > >> >>>> more > >> >>>> > >>>>>>>> flexibility for > >> >>>> > >>>>>>>> >>> >>> > users to define their own topics name, define > what > >> >>>> is > >> >>>> > >>>>>>>> internal topic means, > >> >>>> > >>>>>>>> >>> >>> > how to find checkpoints topics and it will be > one > >> >>>> line > >> >>>> > >>>>>>>> config for each > >> >>>> > >>>>>>>> >>> >>> > herder, also it more consistence with MM2 > code as > >> >>>> MM2 > >> >>>> > >>>>>>>> config have > >> >>>> > >>>>>>>> >>> >>> > TopicFilter, ReplicationPolicy, GroupFilter, > etc as > >> >>>> > >>>>>>>> interface and they can > >> >>>> > >>>>>>>> >>> >>> > be overridden by providing a custom > implementation > >> >>>> for > >> >>>> > >>>>>>>> them or have some > >> >>>> > >>>>>>>> >>> >>> > config that change their default > implementations. > >> >>>> > >>>>>>>> >>> >>> > > >> >>>> > >>>>>>>> >>> >>> > Hope this answer your question. I also > updated the > >> >>>> KIP > >> >>>> > to > >> >>>> > >>>>>>>> add this to the > >> >>>> > >>>>>>>> >>> >>> > rejected solutions. > >> >>>> > >>>>>>>> >>> >>> > > >> >>>> > >>>>>>>> >>> >>> > > >> >>>> > >>>>>>>> >>> >>> > On Thu, Dec 3, 2020 at 3:19 PM Mickael Maison > < > >> >>>> > >>>>>>>> mickael.mai...@gmail.com> > >> >>>> > >>>>>>>> >>> >>> > wrote: > >> >>>> > >>>>>>>> >>> >>> > > >> >>>> > >>>>>>>> >>> >>> > > Hi Omnia, > >> >>>> > >>>>>>>> >>> >>> > > > >> >>>> > >>>>>>>> >>> >>> > > Thanks for the KIP. Indeed being able to > >> >>>> configure > >> >>>> > >>>>>>>> MM2's internal > >> >>>> > >>>>>>>> >>> >>> > > topic names would be a nice improvement. > >> >>>> > >>>>>>>> >>> >>> > > > >> >>>> > >>>>>>>> >>> >>> > > Looking at the KIP, I was surprised you > propose > >> >>>> an > >> >>>> > >>>>>>>> interface to allow > >> >>>> > >>>>>>>> >>> >>> > > users to specify names. Have you considered > >> >>>> making > >> >>>> > >>>>>>>> names changeable > >> >>>> > >>>>>>>> >>> >>> > > via configurations? If so, we should > definitely > >> >>>> > mention > >> >>>> > >>>>>>>> it in the > >> >>>> > >>>>>>>> >>> >>> > > rejected alternatives as it's the first > method > >> >>>> that > >> >>>> > >>>>>>>> comes to mind. > >> >>>> > >>>>>>>> >>> >>> > > > >> >>>> > >>>>>>>> >>> >>> > > I understand an interface gives a lot of > >> >>>> flexibility > >> >>>> > >>>>>>>> but I'd expect > >> >>>> > >>>>>>>> >>> >>> > > topic names to be relatively simple and > known in > >> >>>> > >>>>>>>> advance in most > >> >>>> > >>>>>>>> >>> >>> > > cases. > >> >>>> > >>>>>>>> >>> >>> > > > >> >>>> > >>>>>>>> >>> >>> > > I've not checked all use cases but > something like > >> >>>> > below > >> >>>> > >>>>>>>> felt appropriate: > >> >>>> > >>>>>>>> >>> >>> > > clusters = primary,backup > >> >>>> > >>>>>>>> >>> >>> > > > >> >>>> > >>>>>>>> > primary->backup.offsets-sync.topic=backup.mytopic-offsets- > >> >>>> sync > >> >>>> > >>>>>>>> >>> >>> > > > >> >>>> > >>>>>>>> >>> >>> > > On Tue, Dec 1, 2020 at 3:36 PM Omnia > Ibrahim < > >> >>>> > >>>>>>>> o.g.h.ibra...@gmail.com> > >> >>>> > >>>>>>>> >>> >>> > > wrote: > >> >>>> > >>>>>>>> >>> >>> > > > > >> >>>> > >>>>>>>> >>> >>> > > > Hey everyone, > >> >>>> > >>>>>>>> >>> >>> > > > Please take a look at KIP-690: > >> >>>> > >>>>>>>> >>> >>> > > > > >> >>>> > >>>>>>>> >>> >>> > > > > >> >>>> > >>>>>>>> >>> >>> > > > >> >>>> > >>>>>>>> > >> >>>> > > >> >>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-690%3A+Add+additional+configuration+to+control+MirrorMaker+2+internal+topics+naming+convention > >> >>>> > >>>>>>>> >>> >>> > > > > >> >>>> > >>>>>>>> >>> >>> > > > Thanks for your feedback and support. > >> >>>> > >>>>>>>> >>> >>> > > > > >> >>>> > >>>>>>>> >>> >>> > > > Omnia > >> >>>> > >>>>>>>> >>> >>> > > > > >> >>>> > >>>>>>>> >>> >>> > > > >> >>>> > >>>>>>>> > >> >>>> > >>>>>>> > >> >>>> > > >> >>>> > >> >>>> > >> >>>> -- > >> >>>> Gwen Shapira > >> >>>> Engineering Manager | Confluent > >> >>>> 650.450.2760 | @gwenshap > >> >>>> Follow us: Twitter | blog > >> >>>> > >> >>> > >