gharris1727 commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1107628693


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -701,43 +745,151 @@ protected void produceMessages(EmbeddedConnectCluster 
cluster, String topicName,
         int cnt = 0;
         for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
             for (int p = 0; p < numPartitions; p++)
-                cluster.kafka().produce(topicName, p, "key", "value-" + cnt++);
+                produce(cluster.kafka(), topicName, p, "key", "value-" + 
cnt++);
     }
-    
+
+    /**
+     * Produce a test record to a Kafka cluster.
+     * This method allows subclasses to configure and use their own Kafka 
Producer instead of using the built-in default.
+     * @param cluster   Kafka cluster that should receive the record
+     * @param topic     Topic to send the record to, non-null
+     * @param partition Partition to send the record to, maybe null.
+     * @param key       Kafka key for the record
+     * @param value     Kafka value for the record
+     */
+    protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer 
partition, String key, String value) {
+        cluster.produce(topic, partition, key, value);
+    }
+
+    protected static Map<TopicPartition, OffsetAndMetadata> 
waitForCheckpointOnAllPartitions(
+            MirrorClient client, String consumerGroupName, String 
remoteClusterAlias, String topicName
+    ) throws InterruptedException {
+        AtomicReference<Map<TopicPartition, OffsetAndMetadata>> ret = new 
AtomicReference<>();
+        waitForCondition(
+                () -> {
+                    Map<TopicPartition, OffsetAndMetadata> offsets = 
client.remoteConsumerOffsets(
+                            consumerGroupName, remoteClusterAlias, 
Duration.ofMillis(3000));
+                    for (int i = 0; i < NUM_PARTITIONS; i++) {
+                        if (!offsets.containsKey(new TopicPartition(topicName, 
i))) {
+                            log.info("Checkpoint is missing for {}: {}-{}", 
consumerGroupName, topicName, i);
+                            return false;
+                        }
+                    }
+                    ret.set(offsets);
+                    return true;
+                },
+                CHECKPOINT_DURATION_MS,
+                String.format(
+                        "Offsets for consumer group %s not translated from %s 
for topic %s",
+                        consumerGroupName,
+                        remoteClusterAlias,
+                        topicName
+                )
+        );
+        return ret.get();
+    }
+
     /*
      * given consumer group, topics and expected number of records, make sure 
the consumer group
      * offsets are eventually synced to the expected offset numbers
      */
-    protected static <T> void 
waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect,
-            Consumer<T, T> consumer, List<String> topics, String 
consumerGroupId, int numRecords, boolean exactOffsetTranslation)
-            throws InterruptedException {
+    protected static <T> void waitForConsumerGroupFullSync(
+            EmbeddedConnectCluster connect, List<String> topics, String 
consumerGroupId, int numRecords, boolean exactOffsetTranslation
+    ) throws InterruptedException {
         try (Admin adminClient = connect.kafka().createAdminClient()) {
-            List<TopicPartition> tps = new ArrayList<>(NUM_PARTITIONS * 
topics.size());
+            Map<TopicPartition, OffsetSpec> tps = new HashMap<>(NUM_PARTITIONS 
* topics.size());
             for (int partitionIndex = 0; partitionIndex < NUM_PARTITIONS; 
partitionIndex++) {
                 for (String topic : topics) {
-                    tps.add(new TopicPartition(topic, partitionIndex));
+                    tps.put(new TopicPartition(topic, partitionIndex), 
OffsetSpec.latest());
                 }
             }
             long expectedTotalOffsets = numRecords * topics.size();
 
             waitForCondition(() -> {
                 Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets =
                     
adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get();
-                long consumerGroupOffsetTotal = 
consumerGroupOffsets.values().stream()
+                long totalConsumerGroupOffsets = 
consumerGroupOffsets.values().stream()
                     .mapToLong(OffsetAndMetadata::offset).sum();
 
-                Map<TopicPartition, Long> offsets = consumer.endOffsets(tps, 
CONSUMER_POLL_TIMEOUT_MS);
-                long totalOffsets = offsets.values().stream().mapToLong(l -> 
l).sum();
-
+                Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
endOffsets =
+                        adminClient.listOffsets(tps).all().get();
+                long totalEndOffsets = endOffsets.values().stream()
+                        
.mapToLong(ListOffsetsResult.ListOffsetsResultInfo::offset).sum();
+
+                for (TopicPartition tp : endOffsets.keySet()) {
+                    if (consumerGroupOffsets.containsKey(tp)) {
+                        assertTrue(consumerGroupOffsets.get(tp).offset() <= 
endOffsets.get(tp).offset(),
+                                "Consumer group committed downstream offsets 
beyond the log end, this would lead to negative lag metrics"
+                        );
+                    }
+                }
                 boolean totalOffsetsMatch = exactOffsetTranslation
-                        ? totalOffsets == expectedTotalOffsets
-                        : totalOffsets >= expectedTotalOffsets;
+                        ? totalEndOffsets == expectedTotalOffsets
+                        : totalEndOffsets >= expectedTotalOffsets;
+
+                boolean consumerGroupOffsetsMatch = exactOffsetTranslation
+                        ? totalConsumerGroupOffsets == expectedTotalOffsets
+                        : totalConsumerGroupOffsets >= expectedTotalOffsets;
                 // make sure the consumer group offsets are synced to expected 
number
-                return totalOffsetsMatch && consumerGroupOffsetTotal > 0;
+                return totalOffsetsMatch && consumerGroupOffsetsMatch;
             }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not 
complete in time");
         }
     }
 
+    protected static void insertDummyOffsetSyncs(EmbeddedConnectCluster 
cluster, String offsetSyncsTopic, String topic, int partitions) throws 
InterruptedException {
+        waitForTopicCreated(cluster, offsetSyncsTopic);
+        // Insert a large number of checkpoint records into the offset syncs 
topic to simulate
+        // a long-lived MM2 instance that has replicated many offsets in the 
past.
+        List<ProducerRecord<byte[], byte[]>> records = new ArrayList<>();
+        for (int partition = 0; partition < partitions; partition++) {
+            TopicPartition tp = new TopicPartition(topic, partition);
+            OffsetSync sync = new OffsetSync(tp, 0L, 0L);
+            records.add(new ProducerRecord<>(offsetSyncsTopic, 
sync.recordKey(), sync.recordValue()));
+        }
+        Map<String, Object> producerProps = new HashMap<>();
+        int sentRecords = 0;
+        try (KafkaProducer<byte[], byte[]> producer = 
cluster.kafka().createProducer(producerProps)) {
+            // Try to ensure that the contents of the offset syncs topic is 
too large to read before the checkpoint
+            // interval passes, so that the first checkpoint would take place 
before reading the whole contents of the
+            // sync topic. Experimentally, this test passes with <2x, and 
fails with >5x, without a fix for KAFKA-13659.
+            double consumeEfficiency = 10;
+            long deadline = System.currentTimeMillis() + (long) 
(consumeEfficiency * CHECKPOINT_INTERVAL_DURATION_MS);
+            int nRecords = records.size();
+            while (System.currentTimeMillis() < deadline) {
+                producer.send(records.get(sentRecords % nRecords));
+                sentRecords++;
+            }
+            producer.flush();
+        }
+        log.info("Sent {} dummy records to {}", sentRecords, offsetSyncsTopic);
+    }

Review Comment:
   I'll remove insertDummyOffsetSyncs, but leave the testRestartReplication, as 
it is still testing a new situation: restarting the connectors mid-replication.
   It won't target this specific bug without the `insertDummyOffsetSyncs`, but 
could still add value.
   
   I've added unit tests for OffsetSyncStore that address the bug (translating 
before start finishes) that should hopefully compensate for this test's smaller 
scope.
   
   I looked into testing MirrorCheckpointTask, and it looks too involved to 
address in this PR. I think it would be more appropriate to follow up later 
with some refactoring & testing for the checkpoint task.
   I feel confident from the results of the insertDummyOffsetSyncs test that:
   1. We are addressing the situation where the offset syncs topic is very large
   2. We are not introducing a severe performance regression, as the 
consumeEfficiency is very high.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to