C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1106023715


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -134,9 +138,9 @@ public String version() {
     @Override
     public List<SourceRecord> poll() throws InterruptedException {
         try {
-            long deadline = System.currentTimeMillis() + interval.toMillis();
-            while (!stopping && System.currentTimeMillis() < deadline) {
-                offsetSyncStore.update(pollTimeout);
+            if (stopping.await(pollTimeout.toMillis(), TimeUnit.MILLISECONDS)) 
{

Review Comment:
   Also, we changed the contract for `SourceTask::stop` a while ago to not be 
invoked on a separate thread from the one that invokes `SourceTask::poll` in 
order to fix KAFKA-10792, so I'm not sure how much value changing this to a 
`CountDownLatch` really adds here.
   
   Unless there's something else I'm missing here that necessitates this 
change, I think it's fine to leave `stopping` as a boolean field.
   
   As a follow-up, we could potentially tweak the logic here to pause for, 
e.g., a second and then return a `null` record batch from `SourceTask::poll` if 
the poll timeout hasn't elapsed since the last time we returned a non-null 
batch. But that should not happen in this PR, we're doing plenty here already.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -75,18 +123,9 @@ OptionalLong translateDownstream(TopicPartition 
sourceTopicPartition, long upstr
         }
     }
 
-    // poll and handle records
-    synchronized void update(Duration pollTimeout) {
-        try {
-            consumer.poll(pollTimeout).forEach(this::handleRecord);
-        } catch (WakeupException e) {
-            // swallow
-        }
-    }
-
-    public synchronized void close() {
-        consumer.wakeup();
-        Utils.closeQuietly(consumer, "offset sync store consumer");
+    public void close() {

Review Comment:
   Nit:
   ```suggestion
       @Override
       public void close() {
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -134,9 +138,9 @@ public String version() {
     @Override
     public List<SourceRecord> poll() throws InterruptedException {
         try {
-            long deadline = System.currentTimeMillis() + interval.toMillis();
-            while (!stopping && System.currentTimeMillis() < deadline) {
-                offsetSyncStore.update(pollTimeout);
+            if (stopping.await(pollTimeout.toMillis(), TimeUnit.MILLISECONDS)) 
{

Review Comment:
   Should this condition be inverted?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -16,40 +16,88 @@
  */
 package org.apache.kafka.connect.mirror;
 
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
-import java.util.Map;
-import java.util.HashMap;
 import java.util.Collections;
-import java.time.Duration;
+import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ConcurrentHashMap;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset 
translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private final KafkaConsumer<byte[], byte[]> consumer;
-    private final Map<TopicPartition, OffsetSync> offsetSyncs = new 
HashMap<>();
-    private final TopicPartition offsetSyncTopicPartition;
+    private final KafkaBasedLog<byte[], byte[]> backingStore;
+    private final Map<TopicPartition, OffsetSync> offsetSyncs = new 
ConcurrentHashMap<>();
+    private final TopicAdmin admin;
+    private volatile boolean readToEnd = false;
 
     OffsetSyncStore(MirrorCheckpointConfig config) {
-        consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-            new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        offsetSyncTopicPartition = new 
TopicPartition(config.offsetSyncsTopic(), 0);
-        consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+        Consumer<byte[], byte[]> consumer = null;
+        TopicAdmin admin = null;
+        KafkaBasedLog<byte[], byte[]> store;
+        try {
+            Consumer<byte[], byte[]> finalConsumer = consumer = 
MirrorUtils.newConsumer(config.offsetSyncsTopicConsumerConfig());

Review Comment:
   This is fairly unclean but I can't think of a significantly better 
alternative... guess it's good enough for now.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -16,40 +16,88 @@
  */
 package org.apache.kafka.connect.mirror;
 
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
-import java.util.Map;
-import java.util.HashMap;
 import java.util.Collections;
-import java.time.Duration;
+import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ConcurrentHashMap;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset 
translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private final KafkaConsumer<byte[], byte[]> consumer;
-    private final Map<TopicPartition, OffsetSync> offsetSyncs = new 
HashMap<>();
-    private final TopicPartition offsetSyncTopicPartition;
+    private final KafkaBasedLog<byte[], byte[]> backingStore;
+    private final Map<TopicPartition, OffsetSync> offsetSyncs = new 
ConcurrentHashMap<>();
+    private final TopicAdmin admin;
+    private volatile boolean readToEnd = false;
 
     OffsetSyncStore(MirrorCheckpointConfig config) {
-        consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-            new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        offsetSyncTopicPartition = new 
TopicPartition(config.offsetSyncsTopic(), 0);
-        consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+        Consumer<byte[], byte[]> consumer = null;
+        TopicAdmin admin = null;
+        KafkaBasedLog<byte[], byte[]> store;
+        try {
+            Consumer<byte[], byte[]> finalConsumer = consumer = 
MirrorUtils.newConsumer(config.offsetSyncsTopicConsumerConfig());
+            TopicAdmin finalAdmin = admin = new TopicAdmin(
+                    
config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                    
config.forwardingAdmin(config.offsetSyncsTopicAdminConfig()));
+            store = new KafkaBasedLog<byte[], byte[]>(
+                    config.offsetSyncsTopic(),
+                    Collections.emptyMap(),
+                    Collections.emptyMap(),
+                    () -> finalAdmin,
+                    (error, record) -> this.handleRecord(record),
+                    Time.SYSTEM,
+                    ignored -> {
+                    }) {
+
+                @Override
+                protected Producer<byte[], byte[]> createProducer() {
+                    return null;
+                }
+
+                @Override
+                protected Consumer<byte[], byte[]> createConsumer() {
+                    return finalConsumer;
+                }
+
+                @Override
+                protected boolean readPartition(TopicPartition topicPartition) 
{
+                    return topicPartition.partition() == 0;
+                }
+            };
+        } catch (Throwable t) {
+            Utils.closeQuietly(consumer, "consumer for offset syncs");
+            Utils.closeQuietly(admin, "admin client for offset syncs");
+            throw t;
+        }
+        this.admin = admin;
+        this.backingStore = store;
     }
 
-    // for testing
-    OffsetSyncStore(KafkaConsumer<byte[], byte[]> consumer, TopicPartition 
offsetSyncTopicPartition) {
-        this.consumer = consumer;
-        this.offsetSyncTopicPartition = offsetSyncTopicPartition;
+    OffsetSyncStore() {
+        this.admin = null;
+        this.backingStore = null;
+    }
+
+    public void start() {

Review Comment:
   Can we add a Javadoc here? At a minimum, we should make it clear that this 
performs a synchronous read to the end of the topic.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java:
##########
@@ -392,6 +401,10 @@ protected Consumer<K, V> createConsumer() {
         return new KafkaConsumer<>(consumerConfigs);
     }
 
+    protected boolean readPartition(TopicPartition topicPartition) {

Review Comment:
   Nit: add a Javadoc explaining when this method will be used, what it will be 
used for, and that it's designed to be overridden by subclasses?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java:
##########
@@ -257,8 +257,17 @@ public void start() {
                     " allotted period. This could indicate a connectivity 
issue, unavailable topic partitions, or if" +
                     " this is your first use of the topic it may have taken 
too long to create.");
 
-        for (PartitionInfo partition : partitionInfos)
-            partitions.add(new TopicPartition(partition.topic(), 
partition.partition()));
+        for (PartitionInfo partition : partitionInfos) {
+            TopicPartition topicPartition = new 
TopicPartition(partition.topic(), partition.partition());
+            if (readPartition(topicPartition)) {
+                partitions.add(topicPartition);
+            }
+        }
+        if (partitions.isEmpty()) {
+            throw new ConnectException("Some partitions for " + topic + " 
exist, but no partitions matched the " +
+                    "required filter. This could indicate a connectivity 
issue, unavailable topic partitions, or if" +
+                    " this is your first use of the topic it may have taken 
too long to create.");

Review Comment:
   What circumstances do we realistically anticipate that would lead to this 
which involve connectivity issues, unavailable topic partitions, or a lag in 
topic metadata propagation after it has been created?
   
   IMO it's fine to keep just the first sentence here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to