C0urante commented on code in PR #13178: URL: https://github.com/apache/kafka/pull/13178#discussion_r1107345427
########## connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java: ########## @@ -392,6 +400,17 @@ protected Consumer<K, V> createConsumer() { return new KafkaConsumer<>(consumerConfigs); } + /** + * Test whether a topic partition should be read by this log. + * <p>Overridden by subclasses when only a subset of the assigned partitions should be read into memory. + * By default, this will read all partitions. Review Comment: Some nits: ```suggestion * Signals whether a topic partition should be read by this log. Invoked on {@link #start() startup} once * for every partition found in the log's backing topic. * <p>This method can be overridden by subclasses when only a subset of the assigned partitions * should be read into memory. By default, all partitions are read. ``` ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ########## @@ -701,43 +745,151 @@ protected void produceMessages(EmbeddedConnectCluster cluster, String topicName, int cnt = 0; for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++) for (int p = 0; p < numPartitions; p++) - cluster.kafka().produce(topicName, p, "key", "value-" + cnt++); + produce(cluster.kafka(), topicName, p, "key", "value-" + cnt++); } - + + /** + * Produce a test record to a Kafka cluster. + * This method allows subclasses to configure and use their own Kafka Producer instead of using the built-in default. + * @param cluster Kafka cluster that should receive the record + * @param topic Topic to send the record to, non-null + * @param partition Partition to send the record to, maybe null. + * @param key Kafka key for the record + * @param value Kafka value for the record + */ + protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer partition, String key, String value) { + cluster.produce(topic, partition, key, value); + } + + protected static Map<TopicPartition, OffsetAndMetadata> waitForCheckpointOnAllPartitions( + MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName + ) throws InterruptedException { + AtomicReference<Map<TopicPartition, OffsetAndMetadata>> ret = new AtomicReference<>(); + waitForCondition( + () -> { + Map<TopicPartition, OffsetAndMetadata> offsets = client.remoteConsumerOffsets( + consumerGroupName, remoteClusterAlias, Duration.ofMillis(3000)); + for (int i = 0; i < NUM_PARTITIONS; i++) { + if (!offsets.containsKey(new TopicPartition(topicName, i))) { + log.info("Checkpoint is missing for {}: {}-{}", consumerGroupName, topicName, i); + return false; + } + } + ret.set(offsets); + return true; + }, + CHECKPOINT_DURATION_MS, + String.format( + "Offsets for consumer group %s not translated from %s for topic %s", + consumerGroupName, + remoteClusterAlias, + topicName + ) + ); + return ret.get(); + } + /* * given consumer group, topics and expected number of records, make sure the consumer group * offsets are eventually synced to the expected offset numbers */ - protected static <T> void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect, - Consumer<T, T> consumer, List<String> topics, String consumerGroupId, int numRecords, boolean exactOffsetTranslation) - throws InterruptedException { + protected static <T> void waitForConsumerGroupFullSync( + EmbeddedConnectCluster connect, List<String> topics, String consumerGroupId, int numRecords, boolean exactOffsetTranslation + ) throws InterruptedException { try (Admin adminClient = connect.kafka().createAdminClient()) { - List<TopicPartition> tps = new ArrayList<>(NUM_PARTITIONS * topics.size()); + Map<TopicPartition, OffsetSpec> tps = new HashMap<>(NUM_PARTITIONS * topics.size()); for (int partitionIndex = 0; partitionIndex < NUM_PARTITIONS; partitionIndex++) { for (String topic : topics) { - tps.add(new TopicPartition(topic, partitionIndex)); + tps.put(new TopicPartition(topic, partitionIndex), OffsetSpec.latest()); } } long expectedTotalOffsets = numRecords * topics.size(); waitForCondition(() -> { Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets = adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get(); - long consumerGroupOffsetTotal = consumerGroupOffsets.values().stream() + long totalConsumerGroupOffsets = consumerGroupOffsets.values().stream() .mapToLong(OffsetAndMetadata::offset).sum(); - Map<TopicPartition, Long> offsets = consumer.endOffsets(tps, CONSUMER_POLL_TIMEOUT_MS); - long totalOffsets = offsets.values().stream().mapToLong(l -> l).sum(); - + Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets = + adminClient.listOffsets(tps).all().get(); + long totalEndOffsets = endOffsets.values().stream() + .mapToLong(ListOffsetsResult.ListOffsetsResultInfo::offset).sum(); + + for (TopicPartition tp : endOffsets.keySet()) { + if (consumerGroupOffsets.containsKey(tp)) { + assertTrue(consumerGroupOffsets.get(tp).offset() <= endOffsets.get(tp).offset(), + "Consumer group committed downstream offsets beyond the log end, this would lead to negative lag metrics" + ); + } + } boolean totalOffsetsMatch = exactOffsetTranslation - ? totalOffsets == expectedTotalOffsets - : totalOffsets >= expectedTotalOffsets; + ? totalEndOffsets == expectedTotalOffsets + : totalEndOffsets >= expectedTotalOffsets; + + boolean consumerGroupOffsetsMatch = exactOffsetTranslation + ? totalConsumerGroupOffsets == expectedTotalOffsets + : totalConsumerGroupOffsets >= expectedTotalOffsets; // make sure the consumer group offsets are synced to expected number - return totalOffsetsMatch && consumerGroupOffsetTotal > 0; + return totalOffsetsMatch && consumerGroupOffsetsMatch; }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time"); } } + protected static void insertDummyOffsetSyncs(EmbeddedConnectCluster cluster, String offsetSyncsTopic, String topic, int partitions) throws InterruptedException { + waitForTopicCreated(cluster, offsetSyncsTopic); + // Insert a large number of checkpoint records into the offset syncs topic to simulate + // a long-lived MM2 instance that has replicated many offsets in the past. + List<ProducerRecord<byte[], byte[]>> records = new ArrayList<>(); + for (int partition = 0; partition < partitions; partition++) { + TopicPartition tp = new TopicPartition(topic, partition); + OffsetSync sync = new OffsetSync(tp, 0L, 0L); + records.add(new ProducerRecord<>(offsetSyncsTopic, sync.recordKey(), sync.recordValue())); + } + Map<String, Object> producerProps = new HashMap<>(); + int sentRecords = 0; + try (KafkaProducer<byte[], byte[]> producer = cluster.kafka().createProducer(producerProps)) { + // Try to ensure that the contents of the offset syncs topic is too large to read before the checkpoint + // interval passes, so that the first checkpoint would take place before reading the whole contents of the + // sync topic. Experimentally, this test passes with <2x, and fails with >5x, without a fix for KAFKA-13659. + double consumeEfficiency = 10; + long deadline = System.currentTimeMillis() + (long) (consumeEfficiency * CHECKPOINT_INTERVAL_DURATION_MS); + int nRecords = records.size(); + while (System.currentTimeMillis() < deadline) { + producer.send(records.get(sentRecords % nRecords)); + sentRecords++; + } + producer.flush(); + } + log.info("Sent {} dummy records to {}", sentRecords, offsetSyncsTopic); + } + + protected static void assertMonotonicCheckpoints(EmbeddedConnectCluster cluster, String checkpointTopic) { Review Comment: I really like this method. Nice 👍 ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ########## @@ -30,7 +30,7 @@ public class OffsetSyncStoreTest { static class FakeOffsetSyncStore extends OffsetSyncStore { FakeOffsetSyncStore() { - super(null, null); + super(); Review Comment: A few comments: 1. Some cases in this test suite and the `MirrorCheckpointTaskTest` suite are failing because the store isn't started. We might upgrade the visibility of the `OffsetSyncStore.readToEnd` field to `protected`, and expose a method in this class to set the value for that field. 2. Can we get some additional coverage in either/both of the above-mentioned test suites to make sure that a store that hasn't been started yet, or that takes a while to start, results in the expected lack of checkpoints (or translated downstream offsets)? 3. The assertion messages for this test case need to be updated to match the values they're referring to (or, if you prefer, we can just remove specific values from the messages altogether since they can be easily deduced from the source code and line number if an assertion fails) 4. For the `testOffsetTranslation` case, we might consider performing comparisons between two `Optional` instances instead of unconditionally invoking `getAsLong` on the translated downstream offset. For example, `assertEquals(OptionalLong.of(201L), store.translateDownstream(tp, 150), "Failed to translate downstream offsets");` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org