asdaraujo commented on a change in pull request #8730: URL: https://github.com/apache/kafka/pull/8730#discussion_r496306777
########## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ########## @@ -283,49 +296,140 @@ public void testReplication() throws InterruptedException { waitForCondition(() -> { try { - return primaryClient.remoteConsumerOffsets("consumer-group-1", "backup", + return primaryClient.remoteConsumerOffsets(consumerGroupName, "backup", Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("test-topic-1", 0)); } catch (Throwable e) { return false; } }, CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary cluster."); - Map<TopicPartition, OffsetAndMetadata> primaryOffsets = primaryClient.remoteConsumerOffsets("consumer-group-1", "backup", + Map<TopicPartition, OffsetAndMetadata> primaryOffsets = primaryClient.remoteConsumerOffsets(consumerGroupName, "backup", Duration.ofMillis(CHECKPOINT_DURATION_MS)); - + // Failback consumer group to primary cluster - consumer2 = primary.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1")); - consumer2.assign(primaryOffsets.keySet()); - primaryOffsets.forEach(consumer2::seek); - consumer2.poll(Duration.ofMillis(500)); - - assertTrue("Consumer failedback to zero upstream offset.", consumer2.position(new TopicPartition("test-topic-1", 0)) > 0); - assertTrue("Consumer failedback to zero downstream offset.", consumer2.position(new TopicPartition("backup.test-topic-1", 0)) > 0); - assertTrue("Consumer failedback beyond expected upstream offset.", consumer2.position( - new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED); - assertTrue("Consumer failedback beyond expected downstream offset.", consumer2.position( - new TopicPartition("backup.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED); - - consumer2.close(); - + primaryConsumer = primary.kafka().createConsumer(consumerProps); + primaryConsumer.assign(allPartitions("test-topic-1", "backup.test-topic-1")); + seek(primaryConsumer, primaryOffsets); + consumeAllMessages(primaryConsumer, 0); + + assertTrue("Consumer failedback to zero upstream offset.", primaryConsumer.position(new TopicPartition("test-topic-1", 0)) > 0); + assertTrue("Consumer failedback to zero downstream offset.", primaryConsumer.position(new TopicPartition("backup.test-topic-1", 0)) > 0); + assertTrue("Consumer failedback beyond expected upstream offset.", primaryConsumer.position( + new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PER_PARTITION); + assertTrue("Consumer failedback beyond expected downstream offset.", primaryConsumer.position( + new TopicPartition("backup.test-topic-1", 0)) <= NUM_RECORDS_PER_PARTITION); + + Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> messages2 = consumeAllMessages(primaryConsumer, 0); + // If offset translation was successful we expect no messages to be consumed after failback + assertEquals("Data was consumed from partitions: " + messages2.keySet() + ".", 0, messages2.size()); + primaryConsumer.close(); + // create more matching topics primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS); backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS); - for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) { - primary.kafka().produce("test-topic-2", 0, "key", "message-2-" + i); - backup.kafka().produce("test-topic-3", 0, "key", "message-3-" + i); + produceMessages(primary, "test-topic-2", "message-3-", 1); + produceMessages(backup, "test-topic-3", "message-4-", 1); + + assertEquals("Records were not produced to primary cluster.", NUM_RECORDS_PER_PARTITION, + primary.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-2").count()); + assertEquals("Records were not produced to backup cluster.", NUM_RECORDS_PER_PARTITION, + backup.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-3").count()); + + assertEquals("New topic was not replicated to primary cluster.", NUM_RECORDS_PER_PARTITION, + primary.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "backup.test-topic-3").count()); + assertEquals("New topic was not replicated to backup cluster.", NUM_RECORDS_PER_PARTITION, + backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count()); + } + + @Test + public void testReplicationWithEmptyPartition() throws InterruptedException { Review comment: Thanks, @mimaison . I've changed the test to simplify it as per your suggestion. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org