C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1107345427


##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java:
##########
@@ -392,6 +400,17 @@ protected Consumer<K, V> createConsumer() {
         return new KafkaConsumer<>(consumerConfigs);
     }
 
+    /**
+     * Test whether a topic partition should be read by this log.
+     * <p>Overridden by subclasses when only a subset of the assigned 
partitions should be read into memory.
+     * By default, this will read all partitions.

Review Comment:
   Some nits:
   
   ```suggestion
        * Signals whether a topic partition should be read by this log. Invoked 
on {@link #start() startup} once
        * for every partition found in the log's backing topic.
        * <p>This method can be overridden by subclasses when only a subset of 
the assigned partitions
        * should be read into memory. By default, all partitions are read.
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -701,43 +745,151 @@ protected void produceMessages(EmbeddedConnectCluster 
cluster, String topicName,
         int cnt = 0;
         for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
             for (int p = 0; p < numPartitions; p++)
-                cluster.kafka().produce(topicName, p, "key", "value-" + cnt++);
+                produce(cluster.kafka(), topicName, p, "key", "value-" + 
cnt++);
     }
-    
+
+    /**
+     * Produce a test record to a Kafka cluster.
+     * This method allows subclasses to configure and use their own Kafka 
Producer instead of using the built-in default.
+     * @param cluster   Kafka cluster that should receive the record
+     * @param topic     Topic to send the record to, non-null
+     * @param partition Partition to send the record to, maybe null.
+     * @param key       Kafka key for the record
+     * @param value     Kafka value for the record
+     */
+    protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer 
partition, String key, String value) {
+        cluster.produce(topic, partition, key, value);
+    }
+
+    protected static Map<TopicPartition, OffsetAndMetadata> 
waitForCheckpointOnAllPartitions(
+            MirrorClient client, String consumerGroupName, String 
remoteClusterAlias, String topicName
+    ) throws InterruptedException {
+        AtomicReference<Map<TopicPartition, OffsetAndMetadata>> ret = new 
AtomicReference<>();
+        waitForCondition(
+                () -> {
+                    Map<TopicPartition, OffsetAndMetadata> offsets = 
client.remoteConsumerOffsets(
+                            consumerGroupName, remoteClusterAlias, 
Duration.ofMillis(3000));
+                    for (int i = 0; i < NUM_PARTITIONS; i++) {
+                        if (!offsets.containsKey(new TopicPartition(topicName, 
i))) {
+                            log.info("Checkpoint is missing for {}: {}-{}", 
consumerGroupName, topicName, i);
+                            return false;
+                        }
+                    }
+                    ret.set(offsets);
+                    return true;
+                },
+                CHECKPOINT_DURATION_MS,
+                String.format(
+                        "Offsets for consumer group %s not translated from %s 
for topic %s",
+                        consumerGroupName,
+                        remoteClusterAlias,
+                        topicName
+                )
+        );
+        return ret.get();
+    }
+
     /*
      * given consumer group, topics and expected number of records, make sure 
the consumer group
      * offsets are eventually synced to the expected offset numbers
      */
-    protected static <T> void 
waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect,
-            Consumer<T, T> consumer, List<String> topics, String 
consumerGroupId, int numRecords, boolean exactOffsetTranslation)
-            throws InterruptedException {
+    protected static <T> void waitForConsumerGroupFullSync(
+            EmbeddedConnectCluster connect, List<String> topics, String 
consumerGroupId, int numRecords, boolean exactOffsetTranslation
+    ) throws InterruptedException {
         try (Admin adminClient = connect.kafka().createAdminClient()) {
-            List<TopicPartition> tps = new ArrayList<>(NUM_PARTITIONS * 
topics.size());
+            Map<TopicPartition, OffsetSpec> tps = new HashMap<>(NUM_PARTITIONS 
* topics.size());
             for (int partitionIndex = 0; partitionIndex < NUM_PARTITIONS; 
partitionIndex++) {
                 for (String topic : topics) {
-                    tps.add(new TopicPartition(topic, partitionIndex));
+                    tps.put(new TopicPartition(topic, partitionIndex), 
OffsetSpec.latest());
                 }
             }
             long expectedTotalOffsets = numRecords * topics.size();
 
             waitForCondition(() -> {
                 Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets =
                     
adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get();
-                long consumerGroupOffsetTotal = 
consumerGroupOffsets.values().stream()
+                long totalConsumerGroupOffsets = 
consumerGroupOffsets.values().stream()
                     .mapToLong(OffsetAndMetadata::offset).sum();
 
-                Map<TopicPartition, Long> offsets = consumer.endOffsets(tps, 
CONSUMER_POLL_TIMEOUT_MS);
-                long totalOffsets = offsets.values().stream().mapToLong(l -> 
l).sum();
-
+                Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
endOffsets =
+                        adminClient.listOffsets(tps).all().get();
+                long totalEndOffsets = endOffsets.values().stream()
+                        
.mapToLong(ListOffsetsResult.ListOffsetsResultInfo::offset).sum();
+
+                for (TopicPartition tp : endOffsets.keySet()) {
+                    if (consumerGroupOffsets.containsKey(tp)) {
+                        assertTrue(consumerGroupOffsets.get(tp).offset() <= 
endOffsets.get(tp).offset(),
+                                "Consumer group committed downstream offsets 
beyond the log end, this would lead to negative lag metrics"
+                        );
+                    }
+                }
                 boolean totalOffsetsMatch = exactOffsetTranslation
-                        ? totalOffsets == expectedTotalOffsets
-                        : totalOffsets >= expectedTotalOffsets;
+                        ? totalEndOffsets == expectedTotalOffsets
+                        : totalEndOffsets >= expectedTotalOffsets;
+
+                boolean consumerGroupOffsetsMatch = exactOffsetTranslation
+                        ? totalConsumerGroupOffsets == expectedTotalOffsets
+                        : totalConsumerGroupOffsets >= expectedTotalOffsets;
                 // make sure the consumer group offsets are synced to expected 
number
-                return totalOffsetsMatch && consumerGroupOffsetTotal > 0;
+                return totalOffsetsMatch && consumerGroupOffsetsMatch;
             }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not 
complete in time");
         }
     }
 
+    protected static void insertDummyOffsetSyncs(EmbeddedConnectCluster 
cluster, String offsetSyncsTopic, String topic, int partitions) throws 
InterruptedException {
+        waitForTopicCreated(cluster, offsetSyncsTopic);
+        // Insert a large number of checkpoint records into the offset syncs 
topic to simulate
+        // a long-lived MM2 instance that has replicated many offsets in the 
past.
+        List<ProducerRecord<byte[], byte[]>> records = new ArrayList<>();
+        for (int partition = 0; partition < partitions; partition++) {
+            TopicPartition tp = new TopicPartition(topic, partition);
+            OffsetSync sync = new OffsetSync(tp, 0L, 0L);
+            records.add(new ProducerRecord<>(offsetSyncsTopic, 
sync.recordKey(), sync.recordValue()));
+        }
+        Map<String, Object> producerProps = new HashMap<>();
+        int sentRecords = 0;
+        try (KafkaProducer<byte[], byte[]> producer = 
cluster.kafka().createProducer(producerProps)) {
+            // Try to ensure that the contents of the offset syncs topic is 
too large to read before the checkpoint
+            // interval passes, so that the first checkpoint would take place 
before reading the whole contents of the
+            // sync topic. Experimentally, this test passes with <2x, and 
fails with >5x, without a fix for KAFKA-13659.
+            double consumeEfficiency = 10;
+            long deadline = System.currentTimeMillis() + (long) 
(consumeEfficiency * CHECKPOINT_INTERVAL_DURATION_MS);
+            int nRecords = records.size();
+            while (System.currentTimeMillis() < deadline) {
+                producer.send(records.get(sentRecords % nRecords));
+                sentRecords++;
+            }
+            producer.flush();
+        }
+        log.info("Sent {} dummy records to {}", sentRecords, offsetSyncsTopic);
+    }
+
+    protected static void assertMonotonicCheckpoints(EmbeddedConnectCluster 
cluster, String checkpointTopic) {

Review Comment:
   I really like this method. Nice 👍



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -30,7 +30,7 @@ public class OffsetSyncStoreTest {
     static class FakeOffsetSyncStore extends OffsetSyncStore {
 
         FakeOffsetSyncStore() {
-            super(null, null);
+            super();

Review Comment:
   A few comments:
   
   1. Some cases in this test suite and the `MirrorCheckpointTaskTest` suite 
are failing because the store isn't started. We might upgrade the visibility of 
the `OffsetSyncStore.readToEnd` field to `protected`, and expose a method in 
this class to set the value for that field.
   2. Can we get some additional coverage in either/both of the above-mentioned 
test suites to make sure that a store that hasn't been started yet, or that 
takes a while to start, results in the expected lack of checkpoints (or 
translated downstream offsets)?
   3. The assertion messages for this test case need to be updated to match the 
values they're referring to (or, if you prefer, we can just remove specific 
values from the messages altogether since they can be easily deduced from the 
source code and line number if an assertion fails)
   4. For the `testOffsetTranslation` case, we might consider performing 
comparisons between two `Optional` instances instead of unconditionally 
invoking `getAsLong` on the translated downstream offset. For example, 
`assertEquals(OptionalLong.of(201L), store.translateDownstream(tp, 150), 
"Failed to translate downstream offsets");`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to