[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-08 Thread via GitHub


gharris1727 commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1100680887


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -503,25 +495,6 @@ public void testOffsetSyncsTopicsOnTarget() throws 
Exception {
 
 produceMessages(primary, "test-topic-1");
 
-ReplicationPolicy replicationPolicy = new 
MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)).replicationPolicy();
-String remoteTopic = 
replicationPolicy.formatRemoteTopic(PRIMARY_CLUSTER_ALIAS, "test-topic-1");
-
-// Check offsets are pushed to the checkpoint topic
-Consumer backupConsumer = 
backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-"auto.offset.reset", "earliest"), PRIMARY_CLUSTER_ALIAS + 
".checkpoints.internal");
-waitForCondition(() -> {
-ConsumerRecords records = 
backupConsumer.poll(Duration.ofSeconds(1L));
-for (ConsumerRecord record : records) {
-Checkpoint checkpoint = Checkpoint.deserializeRecord(record);
-if (remoteTopic.equals(checkpoint.topicPartition().topic())) {
-return true;
-}
-}
-return false;
-}, 30_000,
-"Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + 
".test-topic-1"
-);

Review Comment:
   I think that makes sense.
   I'll add some explicitly committed offsets here as a quick fix, and re-add 
the assertion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-08 Thread via GitHub


gharris1727 commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1100622077


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -56,8 +56,7 @@ OptionalLong translateDownstream(TopicPartition 
sourceTopicPartition, long upstr
 // Offset is too far in the past to translate accurately
 return OptionalLong.of(-1L);
 }
-long upstreamStep = upstreamOffset - 
offsetSync.get().upstreamOffset();
-return OptionalLong.of(offsetSync.get().downstreamOffset() + 
upstreamStep);
+return OptionalLong.of(offsetSync.get().downstreamOffset() + 
(offsetSync.get().upstreamOffset() == upstreamOffset ? 0 : 1));

Review Comment:
   Let me know if this description makes sense to you, and if the diagram adds 
any value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-08 Thread via GitHub


gharris1727 commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1100550240


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -700,21 +673,53 @@ protected void produceMessages(EmbeddedConnectCluster 
cluster, String topicName,
 int cnt = 0;
 for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
 for (int p = 0; p < numPartitions; p++)
-cluster.kafka().produce(topicName, p, "key", "value-" + cnt++);
+produce(cluster, topicName, p, "key", "value-" + cnt++);
 }
-
+
+protected void produce(EmbeddedConnectCluster cluster, String topic, 
Integer partition, String key, String value) {
+cluster.kafka().produce(topic, partition, key, value);
+}
+
+protected static Map 
waitForAnyCheckpoint(

Review Comment:
   It's `any` in contrast to the other method's `FullSync`.
   As in, it accepts checkpoints anywhere within a topic, not just at the end 
of the topic.
   
   I'll update the method name to be more precise and verbose :)



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java:
##
@@ -233,6 +229,7 @@ public void testOneWayReplicationWithAutoOffsetSync() 
throws InterruptedExceptio
 // have been automatically synchronized from primary to backup by the 
background job, so no
 // more records to consume from the replicated topic by the same 
consumer group at backup cluster
 assertEquals(0, records.count(), "consumer record size is not zero");
+backupConsumer.close();

Review Comment:
   I code-golfed this one, but there's no reason to. Updated.



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -578,7 +551,7 @@ public void testNoCheckpointsIfNoRecordsAreMirrored() 
throws InterruptedExceptio
 Map translatedOffsets = 
backupClient.remoteConsumerOffsets(
 consumerGroupName, PRIMARY_CLUSTER_ALIAS, 
Duration.ofSeconds(30L));
 return translatedOffsets.containsKey(remoteTopicPartition(tp1, 
PRIMARY_CLUSTER_ALIAS)) &&
-   translatedOffsets.containsKey(remoteTopicPartition(tp2, 
PRIMARY_CLUSTER_ALIAS));
+   !translatedOffsets.containsKey(remoteTopicPartition(tp2, 
PRIMARY_CLUSTER_ALIAS));

Review Comment:
   1. This was passing before, because `offset.max.lag` was nonzero, leaving a 
(0,0) offset sync in the topic.
   2. This condition is part of 
`waitForAnyCheckpoint/waitForCheckpointOnAllPartitions` which is used in 
`testReplication`. I think that test should be sufficient to cover the normal 
case where offsets are translated.



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -503,25 +495,6 @@ public void testOffsetSyncsTopicsOnTarget() throws 
Exception {
 
 produceMessages(primary, "test-topic-1");
 
-ReplicationPolicy replicationPolicy = new 
MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)).replicationPolicy();
-String remoteTopic = 
replicationPolicy.formatRemoteTopic(PRIMARY_CLUSTER_ALIAS, "test-topic-1");
-
-// Check offsets are pushed to the checkpoint topic
-Consumer backupConsumer = 
backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-"auto.offset.reset", "earliest"), PRIMARY_CLUSTER_ALIAS + 
".checkpoints.internal");
-waitForCondition(() -> {
-ConsumerRecords records = 
backupConsumer.poll(Duration.ofSeconds(1L));
-for (ConsumerRecord record : records) {
-Checkpoint checkpoint = Checkpoint.deserializeRecord(record);
-if (remoteTopic.equals(checkpoint.topicPartition().topic())) {
-return true;
-}
-}
-return false;
-}, 30_000,
-"Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + 
".test-topic-1"
-);

Review Comment:
   This assertion relied on a nonzero `offset.lag.max`, where the sync topic 
could be left with a (0,0) record.
   The offset being translated in this case is the `consumer-group-dummy` from 
the setup method, which has offset 0 (at the beginning of the topic). Now that 
`offset.lag.max` is zero and the starvation bug is fixed, the sync topic may 
only see nonzero syncs, which are not able to translate 0 offsets.
   
   I removed this assertion because it was failing and didn't seem to concern 
this test.
   Upon further inspection, maybe this test makes sense to turn into a 
parameter for other tests, to verify that the functionality of the checkpoints 
when the syncs topic is moved around. Does that make sense to do in this PR?



##
connect/mirror/src/test/java/org/