mimaison commented on code in PR #11748:
URL: https://github.com/apache/kafka/pull/11748#discussion_r873586510


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -319,27 +319,21 @@ public void testReplication() throws Exception {
         waitForCondition(() -> 
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
             Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new 
TopicPartition("backup.test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not 
translated downstream to primary cluster.");
 
-        waitForCondition(() -> 
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
-            Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new 
TopicPartition("test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not 
translated upstream to primary cluster.");
-
         Map<TopicPartition, OffsetAndMetadata> primaryOffsets = 
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
                 Duration.ofMillis(CHECKPOINT_DURATION_MS));
  
         primaryClient.close();
         backupClient.close();
         
         // Failback consumer group to primary cluster
-        try (Consumer<byte[], byte[]> backupConsumer = 
primary.kafka().createConsumer(Collections.singletonMap("group.id", 
consumerGroupName))) {
-            backupConsumer.assign(primaryOffsets.keySet());
-            primaryOffsets.forEach(backupConsumer::seek);
-            backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-            backupConsumer.commitAsync();
-        
-            assertTrue(backupConsumer.position(new 
TopicPartition("test-topic-1", 0)) > 0, "Consumer failedback to zero upstream 
offset.");
-            assertTrue(backupConsumer.position(new 
TopicPartition("backup.test-topic-1", 0)) > 0, "Consumer failedback to zero 
downstream offset.");
-            assertTrue(backupConsumer.position(
-                new TopicPartition("test-topic-1", 0)) <= 
NUM_RECORDS_PRODUCED, "Consumer failedback beyond expected upstream offset.");
-            assertTrue(backupConsumer.position(
+        try (Consumer<byte[], byte[]> primaryConsumer = 
primary.kafka().createConsumer(Collections.singletonMap("group.id", 
consumerGroupName))) {
+            primaryConsumer.assign(primaryOffsets.keySet());
+            primaryOffsets.forEach(primaryConsumer::seek);
+            primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+            primaryConsumer.commitAsync();
+
+            assertTrue(primaryConsumer.position(new 
TopicPartition("backup.test-topic-1", 0)) > 0, "Consumer failedback to zero 
downstream offset.");

Review Comment:
   Same as above, now `remoteConsumerOffsets()` only returns offsets for remote 
topics that are being mirrored from the backup cluster and these are prefixed 
with `backup.`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to