asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r496306298



##########
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -283,49 +296,140 @@ public void testReplication() throws 
InterruptedException {
 
         waitForCondition(() -> {
             try {
-                return primaryClient.remoteConsumerOffsets("consumer-group-1", 
"backup",
+                return primaryClient.remoteConsumerOffsets(consumerGroupName, 
"backup",
                     Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new 
TopicPartition("test-topic-1", 0));
             } catch (Throwable e) {
                 return false;
             }
         }, CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary 
cluster.");
 
-        Map<TopicPartition, OffsetAndMetadata> primaryOffsets = 
primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
+        Map<TopicPartition, OffsetAndMetadata> primaryOffsets = 
primaryClient.remoteConsumerOffsets(consumerGroupName, "backup",
                 Duration.ofMillis(CHECKPOINT_DURATION_MS));
- 
+
         // Failback consumer group to primary cluster
-        consumer2 = 
primary.kafka().createConsumer(Collections.singletonMap("group.id", 
"consumer-group-1"));
-        consumer2.assign(primaryOffsets.keySet());
-        primaryOffsets.forEach(consumer2::seek);
-        consumer2.poll(Duration.ofMillis(500));
-
-        assertTrue("Consumer failedback to zero upstream offset.", 
consumer2.position(new TopicPartition("test-topic-1", 0)) > 0);
-        assertTrue("Consumer failedback to zero downstream offset.", 
consumer2.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
-        assertTrue("Consumer failedback beyond expected upstream offset.", 
consumer2.position(
-            new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
-        assertTrue("Consumer failedback beyond expected downstream offset.", 
consumer2.position(
-            new TopicPartition("backup.test-topic-1", 0)) <= 
NUM_RECORDS_PRODUCED);
-        
-        consumer2.close();
-      
+        primaryConsumer = primary.kafka().createConsumer(consumerProps);
+        primaryConsumer.assign(allPartitions("test-topic-1", 
"backup.test-topic-1"));
+        seek(primaryConsumer, primaryOffsets);
+        consumeAllMessages(primaryConsumer, 0);
+
+        assertTrue("Consumer failedback to zero upstream offset.", 
primaryConsumer.position(new TopicPartition("test-topic-1", 0)) > 0);
+        assertTrue("Consumer failedback to zero downstream offset.", 
primaryConsumer.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
+        assertTrue("Consumer failedback beyond expected upstream offset.", 
primaryConsumer.position(
+            new TopicPartition("test-topic-1", 0)) <= 
NUM_RECORDS_PER_PARTITION);
+        assertTrue("Consumer failedback beyond expected downstream offset.", 
primaryConsumer.position(
+            new TopicPartition("backup.test-topic-1", 0)) <= 
NUM_RECORDS_PER_PARTITION);
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> messages2 = 
consumeAllMessages(primaryConsumer, 0);
+        // If offset translation was successful we expect no messages to be 
consumed after failback
+        assertEquals("Data was consumed from partitions: " + 
messages2.keySet() + ".", 0, messages2.size());
+        primaryConsumer.close();
+
         // create more matching topics
         primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
         backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS);
 
-        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
-            primary.kafka().produce("test-topic-2", 0, "key", "message-2-" + 
i);
-            backup.kafka().produce("test-topic-3", 0, "key", "message-3-" + i);
+        produceMessages(primary, "test-topic-2", "message-3-", 1);
+        produceMessages(backup, "test-topic-3", "message-4-", 1);
+
+        assertEquals("Records were not produced to primary cluster.", 
NUM_RECORDS_PER_PARTITION,
+            primary.kafka().consume(NUM_RECORDS_PER_PARTITION, 
RECORD_TRANSFER_DURATION_MS, "test-topic-2").count());
+        assertEquals("Records were not produced to backup cluster.", 
NUM_RECORDS_PER_PARTITION,
+            backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 
RECORD_TRANSFER_DURATION_MS, "test-topic-3").count());
+
+        assertEquals("New topic was not replicated to primary cluster.", 
NUM_RECORDS_PER_PARTITION,
+            primary.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * 
RECORD_TRANSFER_DURATION_MS, "backup.test-topic-3").count());
+        assertEquals("New topic was not replicated to backup cluster.", 
NUM_RECORDS_PER_PARTITION,
+            backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * 
RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
+    }
+
+    @Test
+    public void testReplicationWithEmptyPartition() throws 
InterruptedException {
+        String consumerGroupName = 
"consumer-group-testReplicationWithEmptyPartition";
+        Map<String, Object> consumerProps  = new HashMap<String, Object>() {{
+                put("group.id", consumerGroupName);
+                put("auto.offset.reset", "latest");
+            }};
+
+        // create topics
+        String topic = "test-topic-empty";
+        String primaryTopicReplica = "primary." + topic;
+        String backupTopicReplica = "backup." + topic;
+        primary.kafka().createTopic(topic, NUM_PARTITIONS);
+        primary.kafka().createTopic(backupTopicReplica, 1);
+        backup.kafka().createTopic(topic, NUM_PARTITIONS);
+        backup.kafka().createTopic(primaryTopicReplica, 1);
+
+        // Consume, from the primary cluster, before starting the connectors 
so we don't need to wait for discovery
+        Consumer<byte[], byte[]> consumer = 
primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic, 
backupTopicReplica);
+        consumeAllMessages(consumer, 0);
+
+        // produce to all test-topic-empty's partitions *but the last one*, on 
the primary cluster
+        produceMessages(primary, topic, "message-1-", NUM_PARTITIONS - 1);
+
+        // Consume all messages
+        consumeAllMessages(consumer, NUM_RECORDS_PER_PARTITION * 
(NUM_PARTITIONS - 1));
+        consumer.close();
+
+        // Consumer group offsets after consumption: topic's last partition 
doesn't yet has data, so
+        // the committed offset is 0. All other topic's partition should have 
offset equal to NUM_RECORDS_PER_PARTITION.
+        // backupTopicReplica still has a single empty partition, since MM2 is 
not yet started, and its record offset is 0.
+
+        mm2Config = new MirrorMakerConfig(mm2Props);

Review comment:
       Thanks, @edoardocomar . I've addressed this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to