mimaison commented on code in PR #11748:
URL: https://github.com/apache/kafka/pull/11748#discussion_r873670168


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -526,6 +520,65 @@ public void testOffsetSyncsTopicsOnTarget() throws 
Exception {
         assertFalse(primaryTopics.contains("mm2-offset-syncs." + 
BACKUP_CLUSTER_ALIAS + ".internal"));
     }
 
+    @Test
+    public void testNoCheckpointsIfNoRecordsAreMirrored() throws 
InterruptedException {
+        String consumerGroupName = "consumer-group-no-checkpoints";
+        Map<String, Object> consumerProps = 
Collections.singletonMap("group.id", consumerGroupName);
+
+        // ensure there are some records in the topic on the source cluster
+        produceMessages(primary, "test-topic-1");
+
+        // warm up consumers before starting the connectors, so we don't need 
to wait for discovery
+        warmUpConsumer(consumerProps);
+
+        // one way replication from primary to backup
+        mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + 
".enabled", "false");
+        mm2Config = new MirrorMakerConfig(mm2Props);
+        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, 
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+        // make sure the topics  are created in the backup cluster
+        waitForTopicCreated(backup, remoteTopicName("test-topic-1", 
PRIMARY_CLUSTER_ALIAS));
+        waitForTopicCreated(backup, remoteTopicName("test-topic-no-records", 
PRIMARY_CLUSTER_ALIAS));
+
+        // commit some offsets for both topics in the source cluster
+        TopicPartition tp1 = new TopicPartition("test-topic-1", 0);
+        TopicPartition tp2 = new TopicPartition("test-topic-no-records", 0);
+        //Map<String, Object> consumerProps  = 
Collections.singletonMap("group.id", consumerGroupName);
+        try (Consumer<byte[], byte[]> consumer = 
primary.kafka().createConsumer(consumerProps)) {
+            Collection<TopicPartition> tps = Arrays.asList(tp1, tp2);
+            Map<TopicPartition, Long> endOffsets = consumer.endOffsets(tps);
+            Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
endOffsets.entrySet().stream()
+                            .collect(Collectors.toMap(
+                                    Map.Entry::getKey,
+                                    e -> new OffsetAndMetadata(e.getValue())
+                            ));
+            consumer.commitSync(offsetsToCommit);
+        }
+
+        // Only test-topic-1 should have translated offsets because we've not 
yet mirrored any records for topic-no-records
+        MirrorClient backupClient = new 
MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
+        waitForCondition(() -> {
+            Map<TopicPartition, OffsetAndMetadata> translatedOffsets = 
backupClient.remoteConsumerOffsets(
+                    consumerGroupName, PRIMARY_CLUSTER_ALIAS, 
Duration.ofSeconds(30L));
+            return translatedOffsets.containsKey(remoteTopicPartition(tp1, 
PRIMARY_CLUSTER_ALIAS)) &&
+                   !translatedOffsets.containsKey(remoteTopicPartition(tp2, 
PRIMARY_CLUSTER_ALIAS));
+        }, OFFSET_SYNC_DURATION_MS, "Checkpoints were not emitted correctly to 
backup cluster");
+
+        // Send some records to topic-no-records in the source cluster

Review Comment:
   I've renamed to `test-topic-no-checkpoints` so it match the test/group names



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to