gharris1727 commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1100550240
##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -700,21 +673,53 @@ protected void produceMessages(EmbeddedConnectCluster
cluster, String topicName,
int cnt = 0;
for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
for (int p = 0; p < numPartitions; p++)
-cluster.kafka().produce(topicName, p, "key", "value-" + cnt++);
+produce(cluster, topicName, p, "key", "value-" + cnt++);
}
-
+
+protected void produce(EmbeddedConnectCluster cluster, String topic,
Integer partition, String key, String value) {
+cluster.kafka().produce(topic, partition, key, value);
+}
+
+protected static Map
waitForAnyCheckpoint(
Review Comment:
It's `any` in contrast to the other method's `FullSync`.
As in, it accepts checkpoints anywhere within a topic, not just at the end
of the topic.
I'll update the method name to be more precise and verbose :)
##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java:
##
@@ -233,6 +229,7 @@ public void testOneWayReplicationWithAutoOffsetSync()
throws InterruptedExceptio
// have been automatically synchronized from primary to backup by the
background job, so no
// more records to consume from the replicated topic by the same
consumer group at backup cluster
assertEquals(0, records.count(), "consumer record size is not zero");
+backupConsumer.close();
Review Comment:
I code-golfed this one, but there's no reason to. Updated.
##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -578,7 +551,7 @@ public void testNoCheckpointsIfNoRecordsAreMirrored()
throws InterruptedExceptio
Map translatedOffsets =
backupClient.remoteConsumerOffsets(
consumerGroupName, PRIMARY_CLUSTER_ALIAS,
Duration.ofSeconds(30L));
return translatedOffsets.containsKey(remoteTopicPartition(tp1,
PRIMARY_CLUSTER_ALIAS)) &&
- translatedOffsets.containsKey(remoteTopicPartition(tp2,
PRIMARY_CLUSTER_ALIAS));
+ !translatedOffsets.containsKey(remoteTopicPartition(tp2,
PRIMARY_CLUSTER_ALIAS));
Review Comment:
1. This was passing before, because `offset.max.lag` was nonzero, leaving a
(0,0) offset sync in the topic.
2. This condition is part of
`waitForAnyCheckpoint/waitForCheckpointOnAllPartitions` which is used in
`testReplication`. I think that test should be sufficient to cover the normal
case where offsets are translated.
##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -503,25 +495,6 @@ public void testOffsetSyncsTopicsOnTarget() throws
Exception {
produceMessages(primary, "test-topic-1");
-ReplicationPolicy replicationPolicy = new
MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)).replicationPolicy();
-String remoteTopic =
replicationPolicy.formatRemoteTopic(PRIMARY_CLUSTER_ALIAS, "test-topic-1");
-
-// Check offsets are pushed to the checkpoint topic
-Consumer backupConsumer =
backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-"auto.offset.reset", "earliest"), PRIMARY_CLUSTER_ALIAS +
".checkpoints.internal");
-waitForCondition(() -> {
-ConsumerRecords records =
backupConsumer.poll(Duration.ofSeconds(1L));
-for (ConsumerRecord record : records) {
-Checkpoint checkpoint = Checkpoint.deserializeRecord(record);
-if (remoteTopic.equals(checkpoint.topicPartition().topic())) {
-return true;
-}
-}
-return false;
-}, 30_000,
-"Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS +
".test-topic-1"
-);
Review Comment:
This assertion relied on a nonzero `offset.lag.max`, where the sync topic
could be left with a (0,0) record.
The offset being translated in this case is the `consumer-group-dummy` from
the setup method, which has offset 0 (at the beginning of the topic). Now that
`offset.lag.max` is zero and the starvation bug is fixed, the sync topic may
only see nonzero syncs, which are not able to translate 0 offsets.
I removed this assertion because it was failing and didn't seem to concern
this test.
Upon further inspection, maybe this test makes sense to turn into a
parameter for other tests, to verify that the functionality of the checkpoints
when the syncs topic is moved around. Does that make sense to do in this PR?
##
connect/mirror/src/test/java/org/