ning2008wisc commented on a change in pull request #9224: URL: https://github.com/apache/kafka/pull/9224#discussion_r485363204
########## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTest.java ########## @@ -396,27 +330,67 @@ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedExceptio try (Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( "group.id", "consumer-group-1"), "test-topic-2")) { // we need to wait for consuming all the records for MM2 replicating the expected offsets - waitForConsumingAllRecords(consumer1); + waitForConsumingAllRecords(consumer1, NUM_RECORDS_PRODUCED); } // create a consumer at backup cluster with same consumer group Id to consume old and new topic consumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( "group.id", "consumer-group-1"), "primary.test-topic-1", "primary.test-topic-2"); - waitForConsumerGroupOffsetSync(consumer, Arrays.asList("primary.test-topic-1", "primary.test-topic-2"), "consumer-group-1"); + waitForConsumerGroupOffsetSync(backup, consumer, Arrays.asList("primary.test-topic-1", "primary.test-topic-2"), + "consumer-group-1", NUM_RECORDS_PRODUCED); records = consumer.poll(Duration.ofMillis(500)); // similar reasoning as above, no more records to consume by the same consumer group at backup cluster assertEquals("consumer record size is not zero", 0, records.count()); consumer.close(); - } + + @Test + public void testWithBrokerRestart() throws InterruptedException { + // test with a higher number of records + int numRecords = NUM_RECORDS_PRODUCED * 100; + + produceRecords(Arrays.asList(primary), Arrays.asList("test-topic-1"), numRecords); - private void deleteAllTopics(EmbeddedKafkaCluster cluster) { - Admin client = cluster.createAdminClient(); - try { - client.deleteTopics(client.listTopics().names().get()); - } catch (Throwable e) { - } + // one way replication from primary to backup + mm2Props.put("backup->primary.enabled", "false"); + mm2Config = new MirrorMakerConfig(mm2Props); + + waitUntilMirrorMakerIsRunning(backup, SOURCE_CONNECTOR, mm2Config, "primary", "backup"); + + // have to sleep a little for MM to be ready for the following the kafka broker restart + Thread.sleep(TimeUnit.SECONDS.toMillis(1)); + + // restart kafka broker at backup cluster + restartKafkaBroker(backup); + + Consumer<byte[], byte[]> consumer = backup.kafka().createConsumerAndSubscribeTo( + Collections.singletonMap("group.id", "consumer-group-1"), "primary.test-topic-1"); + // verify the consumption equals to produce + waitForConsumingAllRecords(consumer, numRecords); + consumer.commitAsync(); + + // produce another set of records + produceRecords(Arrays.asList(primary), Arrays.asList("test-topic-1"), numRecords); + // restart kafka broker at primary cluster + restartKafkaBroker(primary); + // verify the consumption equals to produce + waitForConsumingAllRecords(consumer, numRecords); + + consumer.close(); + } + + void createTopics() { + // to verify topic config will be sync-ed across clusters + Map<String, String> topicConfig = new HashMap<>(); + topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT); + // create these topics before starting the connectors so we don't need to wait for discovery + primary.kafka().createTopic("test-topic-1", NUM_PARTITIONS, 1, topicConfig); Review comment: when creating `test-topic-1` topic on primary cluster, add a topic config. Later on, we will check if the config is synced from primary to backup cluster. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org