Hi Ryanne/Josh,

I'm working on active-active mirror maker and while translating  consumer 
offset from source- cluster A to dest cluster B. any pointer would be helpful .

Cluster A
Cluster Name--A
Topic name: testA
Consumer group name: mm-testA-consumer

Cluster -B
Cluster Name--B
Topic name: source .testA
Consumer group name: mm-testA-consumer

Using below API , I would like to translate consumer offset from cluster A to 
cluster B for consumer group - mm-testA-consumer


Map<String, Object> prop = new HashMap<>();
        String  bootsStrapServer ="clusterB:9092";
        String topic = "source.testA";
        String groupId = "mm-testA-consumer";
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootsStrapServer);
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        KafkaConsumer<String,String> consumer= new KafkaConsumer<String, 
String>(prop);
          // consumer.subscribe(Collections.singleton(topic));
            consumer.subscribe(Arrays.asList(topic));
        try {
            Map<TopicPartition, OffsetAndMetadata> newOffsets = 
RemoteClusterUtils.translateOffsets(prop, "A", groupId, Duration.ofMillis(55500)

            );

            System.out.println(newOffsets.toString());
            newOffsets.forEach((topicPartition, offsetAndMetadata) -> 
consumer.seek(topicPartition, offsetAndMetadata));

        }catch (Exception e) {
            System.out.println(e.getMessage());
        }
            while(true){
               ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(100));
               for(ConsumerRecord record :records ){
                  
logger.info("Key:"+record.key()+"Value:"+record.value()+"Offset:"+record.offset()+"Partition:"+record.partition());
                  
System.out.println("Key:"+record.key()+"Value:"+record.value()+"  
Offset:"+record.offset()+"Partition:"+record.partition());
               }

            }

    }

I'm getting below error --
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer 
clientId=consumer-null-2, groupId=null] Error while fetching metadata with 
correlation id 3 : {A.checkpoints.internal=UNKNOWN_TOPIC_OR_PARTITION}
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer 
clientId=consumer-null-2, groupId=null] Error while fetching metadata with 
correlation id 4 : {A.checkpoints.internal=UNKNOWN_TOPIC_OR_PARTITION}
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer 
clientId=consumer-null-2, groupId=null] Error while fetching metadata with 
correlation id 5 : {A.checkpoints.internal=UNKNOWN_TOPIC_OR_PARTITION}
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer 
clientId=consumer-null-2, groupId=null] Error while fetching metadata with 
correlation id 6 : {A.checkpoints.internal=UNKNOWN_TOPIC_OR_PARTITION}
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer 
clientId=consumer-null-2, groupId=null] Error while fetching metadata with 
correlation id 7 : {A.checkpoints.internal=UNKNOWN_TOPIC_OR_PARTITION}
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer 
clientId=consumer-null-2, groupId=null] Error while fetching metadata with 
correlation id 8 : {A.checkpoints.internal=UNKNOWN_TOPIC_OR_PARTITION}
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer 
clientId=consumer-null-2, groupId=null] Error while fetching metadata with 
correlation id 9 : {A.checkpoints.internal=UNKNOWN_TOPIC_OR_PARTITION}


Any suggestion would be helpful .

On 8/21/20, 7:52 AM, "Ryanne Dolan" <ryannedo...@gmail.com> wrote:

    [External]


    Josh, make sure there is a consumer in cluster B subscribed to A.topic1.
    Wait a few seconds for a checkpoint to appear upstream on cluster A, and
    then translateOffsets() will give you the correct offsets.

    By default MM2 will block consumers that look like kafka-console-cosumer,
    so make sure you specify a custom group ID when testing this.

    Ryanne

    On Thu, Aug 20, 2020, 11:21 AM Josh C <josh.fl.c...@gmail.com> wrote:

    > Thanks again Ryanne, I didn't realize that MM2 would handle that.
    >
    > However, I'm unable to mirror the remote topic back to the source cluster
    > by adding it to the topic whitelist. I've also tried to update the topic
    > blacklist and remove ".*\.replica" (since the blacklists take precedence
    > over the whitelists), but that doesn't seem to be doing much either? Is
    > there something else I should be aware of in the mm2.properties file?
    >
    > Appreciate all your help!
    >
    > Josh
    >
    > On Wed, Aug 19, 2020 at 12:55 PM Ryanne Dolan <ryannedo...@gmail.com>
    > wrote:
    >
    > > Josh, if you have two clusters with bidirectional replication, you only
    > get
    > > two copies of each record. MM2 won't replicate the data "upstream", cuz
    > it
    > > knows it's already there. In particular, MM2 knows not to create topics
    > > like B.A.topic1 on cluster A, as this would be an unnecessary cycle.
    > >
    > > >  is there a reason for MM2 not emitting checkpoint data for the source
    > > topic AND the remote topic
    > >
    > > No, not really! I think it would be surprising if one-directional flows
    > > insisted on writing checkpoints both ways -- but it's also surprising
    > that
    > > you need to explicitly allow a remote topic to be checkpointed. I'd
    > support
    > > changing this, fwiw.
    > >
    > > Ryanne
    > >
    > > On Wed, Aug 19, 2020 at 2:30 PM Josh C <josh.fl.c...@gmail.com> wrote:
    > >
    > > > Sorry, correction -- I am realizing now it would be 3 copies of the
    > same
    > > > topic data as A.topic1 has different data than B.topic1. However, that
    > > > would still be 3 copies as opposed to just 2 with something like 
topic1
    > > and
    > > > A.topic1.
    > > >
    > > > As well, if I were to explicitly replicate the remote topic back to 
the
    > > > source cluster by adding it to the topic whitelist, would I also need
    > to
    > > > update the topic blacklist and remove ".*\.replica" (since the
    > blacklists
    > > > take precedence over the whitelists)?
    > > >
    > > > Josh
    > > >
    > > > On Wed, Aug 19, 2020 at 11:46 AM Josh C <josh.fl.c...@gmail.com>
    > wrote:
    > > >
    > > > > Thanks for the clarification Ryanne. In the context of active/active
    > > > > clusters, does this mean there would be 6 copies of the same topic
    > > data?
    > > > >
    > > > > A topics:
    > > > > - topic1
    > > > > - B.topic1
    > > > > - B.A.topic1
    > > > >
    > > > > B topics:
    > > > > - topic1
    > > > > - A.topic1
    > > > > - A.B.topic1
    > > > >
    > > > > Out of curiosity, is there a reason for MM2 not emitting checkpoint
    > > data
    > > > > for the source topic AND the remote topic as a pair as opposed to
    > > having
    > > > to
    > > > > explicitly replicate the remote topic back to the source cluster 
just
    > > to
    > > > > have the checkpoints emitted upstream?
    > > > >
    > > > > Josh
    > > > >
    > > > > On Wed, Aug 19, 2020 at 6:16 AM Ryanne Dolan <ryannedo...@gmail.com>
    > > > > wrote:
    > > > >
    > > > >> Josh, yes it's possible to migrate the consumer group back to the
    > > source
    > > > >> topic, but you need to explicitly replicate the remote topic back 
to
    > > the
    > > > >> source cluster -- otherwise no checkpoints will flow "upstream":
    > > > >>
    > > > >> A->B.topics=test1
    > > > >> B->A.topics=A.test1
    > > > >>
    > > > >> After the first checkpoint is emitted upstream,
    > > > >> RemoteClusterUtils.translateOffsets() will translate B's A.test1
    > > offsets
    > > > >> into A's test1 offsets for you.
    > > > >>
    > > > >> Ryanne
    > > > >>
    > > > >> On Tue, Aug 18, 2020 at 5:56 PM Josh C <josh.fl.c...@gmail.com>
    > > wrote:
    > > > >>
    > > > >> > Hi there,
    > > > >> >
    > > > >> > I'm currently exploring MM2 and having some trouble with the
    > > > >> > RemoteClusterUtils.translateOffsets() method. I have been
    > successful
    > > > in
    > > > >> > migrating a consumer group from the source cluster to the target
    > > > >> cluster,
    > > > >> > but was wondering how I could migrate this consumer group back to
    > > the
    > > > >> > original source topic?
    > > > >> >
    > > > >> > It is my understanding that there isn't any checkpoint data being
    > > > >> > emitted for this consumer group since it is consuming from a
    > > mirrored
    > > > >> topic
    > > > >> > in the target cluster. I'm currently getting an empty map since
    > > there
    > > > >> isn't
    > > > >> > any checkpoint data for 'target.checkpoints.internal' in the
    > source
    > > > >> > cluster. So, I was wondering how would I get these new translated
    > > > >> offsets
    > > > >> > to migrate the consumer group back to the source cluster?
    > > > >> >
    > > > >> > Please let me know if my question was unclear or if you require
    > > > further
    > > > >> > clarification! Appreciate the help.
    > > > >> >
    > > > >> > Thanks,
    > > > >> > Josh
    > > > >> >
    > > > >>
    > > > >
    > > >
    > >
    >


This e-mail and any files transmitted with it are for the sole use of the 
intended recipient(s) and may contain confidential and privileged information. 
If you are not the intended recipient(s), please reply to the sender and 
destroy all copies of the original message. Any unauthorized review, use, 
disclosure, dissemination, forwarding, printing or copying of this email, 
and/or any action taken in reliance on the contents of this e-mail is strictly 
prohibited and may be unlawful. Where permitted by applicable law, this e-mail 
and other e-mail communications sent to and from Cognizant e-mail addresses may 
be monitored.
This e-mail and any files transmitted with it are for the sole use of the 
intended recipient(s) and may contain confidential and privileged information. 
If you are not the intended recipient(s), please reply to the sender and 
destroy all copies of the original message. Any unauthorized review, use, 
disclosure, dissemination, forwarding, printing or copying of this email, 
and/or any action taken in reliance on the contents of this e-mail is strictly 
prohibited and may be unlawful. Where permitted by applicable law, this e-mail 
and other e-mail communications sent to and from Cognizant e-mail addresses may 
be monitored.

Reply via email to