ning2008wisc commented on a change in pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#discussion_r485363204



##########
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTest.java
##########
@@ -396,27 +330,67 @@ public void testOneWayReplicationWithAutoOffsetSync() 
throws InterruptedExceptio
         try (Consumer<byte[], byte[]> consumer1 = 
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
             "group.id", "consumer-group-1"), "test-topic-2")) {
             // we need to wait for consuming all the records for MM2 
replicating the expected offsets
-            waitForConsumingAllRecords(consumer1);
+            waitForConsumingAllRecords(consumer1, NUM_RECORDS_PRODUCED);
         }
 
         // create a consumer at backup cluster with same consumer group Id to 
consume old and new topic
         consumer = 
backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
             "group.id", "consumer-group-1"), "primary.test-topic-1", 
"primary.test-topic-2");
 
-        waitForConsumerGroupOffsetSync(consumer, 
Arrays.asList("primary.test-topic-1", "primary.test-topic-2"), 
"consumer-group-1");
+        waitForConsumerGroupOffsetSync(backup, consumer, 
Arrays.asList("primary.test-topic-1", "primary.test-topic-2"), 
+            "consumer-group-1", NUM_RECORDS_PRODUCED);
 
         records = consumer.poll(Duration.ofMillis(500));
         // similar reasoning as above, no more records to consume by the same 
consumer group at backup cluster
         assertEquals("consumer record size is not zero", 0, records.count());
         consumer.close();
-
     }
+    
+    @Test
+    public void testWithBrokerRestart() throws InterruptedException {
+        // test with a higher number of records
+        int numRecords = NUM_RECORDS_PRODUCED * 100;
+        
+        produceRecords(Arrays.asList(primary), Arrays.asList("test-topic-1"), 
numRecords);
 
-    private void deleteAllTopics(EmbeddedKafkaCluster cluster) {
-        Admin client = cluster.createAdminClient();
-        try {
-            client.deleteTopics(client.listTopics().names().get());
-        } catch (Throwable e) {
-        }
+        // one way replication from primary to backup
+        mm2Props.put("backup->primary.enabled", "false");
+        mm2Config = new MirrorMakerConfig(mm2Props);
+       
+        waitUntilMirrorMakerIsRunning(backup, SOURCE_CONNECTOR, mm2Config, 
"primary", "backup");
+        
+        // have to sleep a little for MM to be ready for the following the 
kafka broker restart
+        Thread.sleep(TimeUnit.SECONDS.toMillis(1));
+
+        // restart kafka broker at backup cluster
+        restartKafkaBroker(backup);
+        
+        Consumer<byte[], byte[]> consumer = 
backup.kafka().createConsumerAndSubscribeTo(
+                Collections.singletonMap("group.id", "consumer-group-1"), 
"primary.test-topic-1");
+        // verify the consumption equals to produce
+        waitForConsumingAllRecords(consumer, numRecords);
+        consumer.commitAsync();
+        
+        // produce another set of records
+        produceRecords(Arrays.asList(primary), Arrays.asList("test-topic-1"), 
numRecords);
+        // restart kafka broker at primary cluster
+        restartKafkaBroker(primary);
+        // verify the consumption equals to produce
+        waitForConsumingAllRecords(consumer, numRecords);
+        
+        consumer.close();
+    }
+    
+    void createTopics() {
+        // to verify topic config will be sync-ed across clusters
+        Map<String, String> topicConfig = new HashMap<>();
+        topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_COMPACT);
+        // create these topics before starting the connectors so we don't need 
to wait for discovery
+        primary.kafka().createTopic("test-topic-1", NUM_PARTITIONS, 1, 
topicConfig);

Review comment:
       when creating `test-topic-1` topic on primary cluster, add a topic 
config. Later on, we will check if the config is synced from primary to backup 
cluster.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to