C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1099340592


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -700,21 +673,53 @@ protected void produceMessages(EmbeddedConnectCluster 
cluster, String topicName,
         int cnt = 0;
         for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
             for (int p = 0; p < numPartitions; p++)
-                cluster.kafka().produce(topicName, p, "key", "value-" + cnt++);
+                produce(cluster, topicName, p, "key", "value-" + cnt++);
     }
-    
+
+    protected void produce(EmbeddedConnectCluster cluster, String topic, 
Integer partition, String key, String value) {
+        cluster.kafka().produce(topic, partition, key, value);
+    }
+
+    protected static Map<TopicPartition, OffsetAndMetadata> 
waitForAnyCheckpoint(

Review Comment:
   Nit: why "anyCheckpoint" instead of "allCheckpoints", given that we're 
ensuring that there's a checkpoint for every partition of the topic, instead of 
only one?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -56,8 +56,7 @@ OptionalLong translateDownstream(TopicPartition 
sourceTopicPartition, long upstr
                 // Offset is too far in the past to translate accurately
                 return OptionalLong.of(-1L);
             }
-            long upstreamStep = upstreamOffset - 
offsetSync.get().upstreamOffset();
-            return OptionalLong.of(offsetSync.get().downstreamOffset() + 
upstreamStep);
+            return OptionalLong.of(offsetSync.get().downstreamOffset() + 
(offsetSync.get().upstreamOffset() == upstreamOffset ? 0 : 1));

Review Comment:
   I believe this part is correct, but it took me a while to grok the 
motivation here. Could we pull this out into two lines, one to determine the 
amount we bump by (i.e., 0 or 1) with a comment providing a rationale for the 
logic (including why we can't do a more aggressive bump derived from the delta 
between the upstream offsets for the consumer group and the checkpoint), and 
one to return the downstream offset plus that bump?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java:
##########
@@ -233,6 +229,7 @@ public void testOneWayReplicationWithAutoOffsetSync() 
throws InterruptedExceptio
         // have been automatically synchronized from primary to backup by the 
background job, so no
         // more records to consume from the replicated topic by the same 
consumer group at backup cluster
         assertEquals(0, records.count(), "consumer record size is not zero");
+        backupConsumer.close();

Review Comment:
   Good catch! Can we use a try-with-resources block for these consumers? 
Should set a precedent that makes it harder to leak them in the future.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -725,11 +730,20 @@ protected static <T> void 
waitForConsumerGroupOffsetSync(EmbeddedConnectCluster
                 long consumerGroupOffsetTotal = 
consumerGroupOffsets.values().stream()
                     .mapToLong(OffsetAndMetadata::offset).sum();
 
-                Map<TopicPartition, Long> offsets = consumer.endOffsets(tps, 
CONSUMER_POLL_TIMEOUT_MS);
-                long totalOffsets = offsets.values().stream().mapToLong(l -> 
l).sum();
-
+                Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
offsets =

Review Comment:
   Nit: "offsets" and "totalOffsets" are a little generic and confusing; WDYT 
about "endOffsets" and "totalEndOffsets"?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -578,7 +551,7 @@ public void testNoCheckpointsIfNoRecordsAreMirrored() 
throws InterruptedExceptio
             Map<TopicPartition, OffsetAndMetadata> translatedOffsets = 
backupClient.remoteConsumerOffsets(
                     consumerGroupName, PRIMARY_CLUSTER_ALIAS, 
Duration.ofSeconds(30L));
             return translatedOffsets.containsKey(remoteTopicPartition(tp1, 
PRIMARY_CLUSTER_ALIAS)) &&
-                   translatedOffsets.containsKey(remoteTopicPartition(tp2, 
PRIMARY_CLUSTER_ALIAS));
+                   !translatedOffsets.containsKey(remoteTopicPartition(tp2, 
PRIMARY_CLUSTER_ALIAS));

Review Comment:
   If I'm reading this correctly, the reason we don't expect `tp2` to have 
translated offsets here is because the upstream consumer group offset for this 
partition is behind the checkpoints emitted for it.
   
   If that's correct:
   1. Any idea why this was passing before?
   2. Do you think we should expand the test to commit an offset for this 
partition in the consumer group, then verify that that offset is included in 
the translated offsets afterward?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Integration test for MirrorMaker2 in which source records are emitted with 
a transactional producer,
+ * which interleaves transaction commit messages into the source topic which 
are not propagated downstream.
+ */
+public class MirrorConnectorsIntegrationTransactionsTest extends 
MirrorConnectorsIntegrationBaseTest {
+
+    private Map<String, Object> producerProps = new HashMap<>();
+
+    @BeforeEach
+    @Override
+    public void startClusters() throws Exception {
+        primaryBrokerProps.put("transaction.state.log.replication.factor", 
"1");
+        backupBrokerProps.put("transaction.state.log.replication.factor", "1");
+        primaryBrokerProps.put("transaction.state.log.min.isr", "1");
+        backupBrokerProps.put("transaction.state.log.min.isr", "1");
+        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
+        producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
"embedded-kafka-0");
+        super.startClusters();
+    }
+
+    @Override
+    protected void produce(EmbeddedConnectCluster cluster, String topic, 
Integer partition, String key, String value) {

Review Comment:
   I was a little skeptical about the purpose of the `produce` method in the 
base class; after seeing this, its value is clear.
   
   Mind adding a brief Javaodc to the base class's `producer` method explaining 
what it does and why it may be useful to override in a subclass?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -503,25 +495,6 @@ public void testOffsetSyncsTopicsOnTarget() throws 
Exception {
 
         produceMessages(primary, "test-topic-1");
 
-        ReplicationPolicy replicationPolicy = new 
MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)).replicationPolicy();
-        String remoteTopic = 
replicationPolicy.formatRemoteTopic(PRIMARY_CLUSTER_ALIAS, "test-topic-1");
-
-        // Check offsets are pushed to the checkpoint topic
-        Consumer<byte[], byte[]> backupConsumer = 
backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-                "auto.offset.reset", "earliest"), PRIMARY_CLUSTER_ALIAS + 
".checkpoints.internal");
-        waitForCondition(() -> {
-            ConsumerRecords<byte[], byte[]> records = 
backupConsumer.poll(Duration.ofSeconds(1L));
-            for (ConsumerRecord<byte[], byte[]> record : records) {
-                Checkpoint checkpoint = Checkpoint.deserializeRecord(record);
-                if (remoteTopic.equals(checkpoint.topicPartition().topic())) {
-                    return true;
-                }
-            }
-            return false;
-        }, 30_000,
-            "Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + 
".test-topic-1"
-        );

Review Comment:
   Why is this removed?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -469,7 +461,7 @@ public void testOneWayReplicationWithAutoOffsetSync() 
throws InterruptedExceptio
 
         // create a consumer at primary cluster to consume the new topic
         try (Consumer<byte[], byte[]> consumer1 = 
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-                "group.id", "consumer-group-1"), "test-topic-2")) {

Review Comment:
   Any idea how this test was passing before? It looks like with the wrong 
consumer group ID here, the old call to `waitForConsumerGroupOffsetSync` at 
line 481 should never have succeeded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to