[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-15 Thread via GitHub


C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1107352681


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -701,43 +745,151 @@ protected void produceMessages(EmbeddedConnectCluster 
cluster, String topicName,
 int cnt = 0;
 for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
 for (int p = 0; p < numPartitions; p++)
-cluster.kafka().produce(topicName, p, "key", "value-" + cnt++);
+produce(cluster.kafka(), topicName, p, "key", "value-" + 
cnt++);
 }
-
+
+/**
+ * Produce a test record to a Kafka cluster.
+ * This method allows subclasses to configure and use their own Kafka 
Producer instead of using the built-in default.
+ * @param cluster   Kafka cluster that should receive the record
+ * @param topic Topic to send the record to, non-null
+ * @param partition Partition to send the record to, maybe null.
+ * @param key   Kafka key for the record
+ * @param value Kafka value for the record
+ */
+protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer 
partition, String key, String value) {
+cluster.produce(topic, partition, key, value);
+}
+
+protected static Map 
waitForCheckpointOnAllPartitions(
+MirrorClient client, String consumerGroupName, String 
remoteClusterAlias, String topicName
+) throws InterruptedException {
+AtomicReference> ret = new 
AtomicReference<>();
+waitForCondition(
+() -> {
+Map offsets = 
client.remoteConsumerOffsets(
+consumerGroupName, remoteClusterAlias, 
Duration.ofMillis(3000));
+for (int i = 0; i < NUM_PARTITIONS; i++) {
+if (!offsets.containsKey(new TopicPartition(topicName, 
i))) {
+log.info("Checkpoint is missing for {}: {}-{}", 
consumerGroupName, topicName, i);
+return false;
+}
+}
+ret.set(offsets);
+return true;
+},
+CHECKPOINT_DURATION_MS,
+String.format(
+"Offsets for consumer group %s not translated from %s 
for topic %s",
+consumerGroupName,
+remoteClusterAlias,
+topicName
+)
+);
+return ret.get();
+}
+
 /*
  * given consumer group, topics and expected number of records, make sure 
the consumer group
  * offsets are eventually synced to the expected offset numbers
  */
-protected static  void 
waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect,
-Consumer consumer, List topics, String 
consumerGroupId, int numRecords, boolean exactOffsetTranslation)
-throws InterruptedException {
+protected static  void waitForConsumerGroupFullSync(
+EmbeddedConnectCluster connect, List topics, String 
consumerGroupId, int numRecords, boolean exactOffsetTranslation
+) throws InterruptedException {
 try (Admin adminClient = connect.kafka().createAdminClient()) {
-List tps = new ArrayList<>(NUM_PARTITIONS * 
topics.size());
+Map tps = new HashMap<>(NUM_PARTITIONS 
* topics.size());
 for (int partitionIndex = 0; partitionIndex < NUM_PARTITIONS; 
partitionIndex++) {
 for (String topic : topics) {
-tps.add(new TopicPartition(topic, partitionIndex));
+tps.put(new TopicPartition(topic, partitionIndex), 
OffsetSpec.latest());
 }
 }
 long expectedTotalOffsets = numRecords * topics.size();
 
 waitForCondition(() -> {
 Map consumerGroupOffsets =
 
adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get();
-long consumerGroupOffsetTotal = 
consumerGroupOffsets.values().stream()
+long totalConsumerGroupOffsets = 
consumerGroupOffsets.values().stream()
 .mapToLong(OffsetAndMetadata::offset).sum();
 
-Map offsets = consumer.endOffsets(tps, 
CONSUMER_POLL_TIMEOUT_MS);
-long totalOffsets = offsets.values().stream().mapToLong(l -> 
l).sum();
-
+Map 
endOffsets =
+adminClient.listOffsets(tps).all().get();
+long totalEndOffsets = endOffsets.values().stream()
+
.mapToLong(ListOffsetsResult.ListOffsetsResultInfo::offset).sum();
+
+for (TopicPartition tp : endOffsets.keySet()) {
+if (consumerGroupOffsets.containsKey(tp)) {
+assertTrue(consumerGroup

[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-15 Thread via GitHub


C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1107345427


##
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java:
##
@@ -392,6 +400,17 @@ protected Consumer createConsumer() {
 return new KafkaConsumer<>(consumerConfigs);
 }
 
+/**
+ * Test whether a topic partition should be read by this log.
+ * Overridden by subclasses when only a subset of the assigned 
partitions should be read into memory.
+ * By default, this will read all partitions.

Review Comment:
   Some nits:
   
   ```suggestion
* Signals whether a topic partition should be read by this log. Invoked 
on {@link #start() startup} once
* for every partition found in the log's backing topic.
* This method can be overridden by subclasses when only a subset of 
the assigned partitions
* should be read into memory. By default, all partitions are read.
   ```



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -701,43 +745,151 @@ protected void produceMessages(EmbeddedConnectCluster 
cluster, String topicName,
 int cnt = 0;
 for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
 for (int p = 0; p < numPartitions; p++)
-cluster.kafka().produce(topicName, p, "key", "value-" + cnt++);
+produce(cluster.kafka(), topicName, p, "key", "value-" + 
cnt++);
 }
-
+
+/**
+ * Produce a test record to a Kafka cluster.
+ * This method allows subclasses to configure and use their own Kafka 
Producer instead of using the built-in default.
+ * @param cluster   Kafka cluster that should receive the record
+ * @param topic Topic to send the record to, non-null
+ * @param partition Partition to send the record to, maybe null.
+ * @param key   Kafka key for the record
+ * @param value Kafka value for the record
+ */
+protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer 
partition, String key, String value) {
+cluster.produce(topic, partition, key, value);
+}
+
+protected static Map 
waitForCheckpointOnAllPartitions(
+MirrorClient client, String consumerGroupName, String 
remoteClusterAlias, String topicName
+) throws InterruptedException {
+AtomicReference> ret = new 
AtomicReference<>();
+waitForCondition(
+() -> {
+Map offsets = 
client.remoteConsumerOffsets(
+consumerGroupName, remoteClusterAlias, 
Duration.ofMillis(3000));
+for (int i = 0; i < NUM_PARTITIONS; i++) {
+if (!offsets.containsKey(new TopicPartition(topicName, 
i))) {
+log.info("Checkpoint is missing for {}: {}-{}", 
consumerGroupName, topicName, i);
+return false;
+}
+}
+ret.set(offsets);
+return true;
+},
+CHECKPOINT_DURATION_MS,
+String.format(
+"Offsets for consumer group %s not translated from %s 
for topic %s",
+consumerGroupName,
+remoteClusterAlias,
+topicName
+)
+);
+return ret.get();
+}
+
 /*
  * given consumer group, topics and expected number of records, make sure 
the consumer group
  * offsets are eventually synced to the expected offset numbers
  */
-protected static  void 
waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect,
-Consumer consumer, List topics, String 
consumerGroupId, int numRecords, boolean exactOffsetTranslation)
-throws InterruptedException {
+protected static  void waitForConsumerGroupFullSync(
+EmbeddedConnectCluster connect, List topics, String 
consumerGroupId, int numRecords, boolean exactOffsetTranslation
+) throws InterruptedException {
 try (Admin adminClient = connect.kafka().createAdminClient()) {
-List tps = new ArrayList<>(NUM_PARTITIONS * 
topics.size());
+Map tps = new HashMap<>(NUM_PARTITIONS 
* topics.size());
 for (int partitionIndex = 0; partitionIndex < NUM_PARTITIONS; 
partitionIndex++) {
 for (String topic : topics) {
-tps.add(new TopicPartition(topic, partitionIndex));
+tps.put(new TopicPartition(topic, partitionIndex), 
OffsetSpec.latest());
 }
 }
 long expectedTotalOffsets = numRecords * topics.size();
 
 waitForCondition(() -> {
 Map consumerGroupOffsets =
 
adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata()

[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-15 Thread via GitHub


C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1107275404


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -134,9 +138,9 @@ public String version() {
 @Override
 public List poll() throws InterruptedException {
 try {
-long deadline = System.currentTimeMillis() + interval.toMillis();
-while (!stopping && System.currentTimeMillis() < deadline) {
-offsetSyncStore.update(pollTimeout);
+if (stopping.await(pollTimeout.toMillis(), TimeUnit.MILLISECONDS)) 
{

Review Comment:
   Ah yeah, totally right, the condition was correct. Sorry about that!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-15 Thread via GitHub


C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1107273995


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -16,40 +16,88 @@
  */
 package org.apache.kafka.connect.mirror;
 
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
-import java.util.Map;
-import java.util.HashMap;
 import java.util.Collections;
-import java.time.Duration;
+import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ConcurrentHashMap;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset 
translation. */
 class OffsetSyncStore implements AutoCloseable {
-private final KafkaConsumer consumer;
-private final Map offsetSyncs = new 
HashMap<>();
-private final TopicPartition offsetSyncTopicPartition;
+private final KafkaBasedLog backingStore;
+private final Map offsetSyncs = new 
ConcurrentHashMap<>();
+private final TopicAdmin admin;
+private volatile boolean readToEnd = false;
 
 OffsetSyncStore(MirrorCheckpointConfig config) {
-consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-new ByteArrayDeserializer(), new ByteArrayDeserializer());
-offsetSyncTopicPartition = new 
TopicPartition(config.offsetSyncsTopic(), 0);
-consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+Consumer consumer = null;
+TopicAdmin admin = null;
+KafkaBasedLog store;
+try {
+Consumer finalConsumer = consumer = 
MirrorUtils.newConsumer(config.offsetSyncsTopicConsumerConfig());

Review Comment:
   Ah yeah, much cleaner. Thanks! 👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-14 Thread via GitHub


C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1106023715


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -134,9 +138,9 @@ public String version() {
 @Override
 public List poll() throws InterruptedException {
 try {
-long deadline = System.currentTimeMillis() + interval.toMillis();
-while (!stopping && System.currentTimeMillis() < deadline) {
-offsetSyncStore.update(pollTimeout);
+if (stopping.await(pollTimeout.toMillis(), TimeUnit.MILLISECONDS)) 
{

Review Comment:
   Also, we changed the contract for `SourceTask::stop` a while ago to not be 
invoked on a separate thread from the one that invokes `SourceTask::poll` in 
order to fix KAFKA-10792, so I'm not sure how much value changing this to a 
`CountDownLatch` really adds here.
   
   Unless there's something else I'm missing here that necessitates this 
change, I think it's fine to leave `stopping` as a boolean field.
   
   As a follow-up, we could potentially tweak the logic here to pause for, 
e.g., a second and then return a `null` record batch from `SourceTask::poll` if 
the poll timeout hasn't elapsed since the last time we returned a non-null 
batch. But that should not happen in this PR, we're doing plenty here already.



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -75,18 +123,9 @@ OptionalLong translateDownstream(TopicPartition 
sourceTopicPartition, long upstr
 }
 }
 
-// poll and handle records
-synchronized void update(Duration pollTimeout) {
-try {
-consumer.poll(pollTimeout).forEach(this::handleRecord);
-} catch (WakeupException e) {
-// swallow
-}
-}
-
-public synchronized void close() {
-consumer.wakeup();
-Utils.closeQuietly(consumer, "offset sync store consumer");
+public void close() {

Review Comment:
   Nit:
   ```suggestion
   @Override
   public void close() {
   ```



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -134,9 +138,9 @@ public String version() {
 @Override
 public List poll() throws InterruptedException {
 try {
-long deadline = System.currentTimeMillis() + interval.toMillis();
-while (!stopping && System.currentTimeMillis() < deadline) {
-offsetSyncStore.update(pollTimeout);
+if (stopping.await(pollTimeout.toMillis(), TimeUnit.MILLISECONDS)) 
{

Review Comment:
   Should this condition be inverted?



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -16,40 +16,88 @@
  */
 package org.apache.kafka.connect.mirror;
 
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
-import java.util.Map;
-import java.util.HashMap;
 import java.util.Collections;
-import java.time.Duration;
+import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ConcurrentHashMap;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset 
translation. */
 class OffsetSyncStore implements AutoCloseable {
-private final KafkaConsumer consumer;
-private final Map offsetSyncs = new 
HashMap<>();
-private final TopicPartition offsetSyncTopicPartition;
+private final KafkaBasedLog backingStore;
+private final Map offsetSyncs = new 
ConcurrentHashMap<>();
+private final TopicAdmin admin;
+private volatile boolean readToEnd = false;
 
 OffsetSyncStore(MirrorCheckpointConfig config) {
-consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-new ByteArrayDeserializer(), new ByteArrayDeserializer());
-offsetSyncTopicPartition = new 
TopicPartition(config.offsetSyncsTopic(), 0);
-consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+Consumer consumer = null;
+TopicAdmin admin = null;
+KafkaBasedLog store;
+try {
+Consumer finalConsumer = consumer = 
MirrorUtils.newConsumer(config.offsetSyncsTopicConsumerConfig());

Review Comment:
   This is fairly unclean but I can't think of a significantly better 
alternative... guess it's

[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-13 Thread via GitHub


C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1104643146


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -16,40 +16,76 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
 import java.util.Map;
 import java.util.HashMap;
-import java.util.Collections;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset 
translation. */
 class OffsetSyncStore implements AutoCloseable {
-private final KafkaConsumer consumer;
+private final KafkaBasedLog backingStore;
 private final Map offsetSyncs = new 
HashMap<>();
-private final TopicPartition offsetSyncTopicPartition;
+private final TopicAdmin admin;
 
 OffsetSyncStore(MirrorCheckpointConfig config) {
-consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-new ByteArrayDeserializer(), new ByteArrayDeserializer());
-offsetSyncTopicPartition = new 
TopicPartition(config.offsetSyncsTopic(), 0);

Review Comment:
   This leads to a change in behavior since we'll end up consuming from all 
partitions in the offset syncs topic instead of just partition 0.
   
   We intentionally [write every offset sync to partition 
zero](https://github.com/apache/kafka/blob/8cfafba2794562840b0f1c537e304f084b9359cf/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L249)
 and [create the topic with a single 
partition](https://github.com/apache/kafka/blob/8cfafba2794562840b0f1c537e304f084b9359cf/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L361),
 but the topic may have been created out-of-band and there may be other 
information in it which has not been produced by MM2 that we shouldn't consume.
   
   Could we expand the `KafkaBasedLog` API to support reading from a specific 
subset of the partitions for a topic, possibly by adding a `protected 
List assignedPartitions(List partitionInfos)` 
method that can be overridden by subclasses? This would allow us to completely 
preserve the existing behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-13 Thread via GitHub


C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1104664513


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -16,40 +16,76 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
 import java.util.Map;
 import java.util.HashMap;
-import java.util.Collections;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset 
translation. */
 class OffsetSyncStore implements AutoCloseable {
-private final KafkaConsumer consumer;
+private final KafkaBasedLog backingStore;
 private final Map offsetSyncs = new 
HashMap<>();
-private final TopicPartition offsetSyncTopicPartition;
+private final TopicAdmin admin;
 
 OffsetSyncStore(MirrorCheckpointConfig config) {
-consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-new ByteArrayDeserializer(), new ByteArrayDeserializer());
-offsetSyncTopicPartition = new 
TopicPartition(config.offsetSyncsTopic(), 0);
-consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+String topic = config.offsetSyncsTopic();
+Consumer consumer = new KafkaConsumer<>(
+config.offsetSyncsTopicConsumerConfig(),
+new ByteArrayDeserializer(),
+new ByteArrayDeserializer());
+this.admin = new TopicAdmin(
+
config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+config.forwardingAdmin(config.offsetSyncsTopicAdminConfig()));
+KafkaBasedLog store = null;
+try {
+store = KafkaBasedLog.withExistingClients(
+topic,
+consumer,
+null,
+new TopicAdmin(
+
config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+
config.forwardingAdmin(config.offsetSyncsTopicAdminConfig())),
+(error, record) -> this.handleRecord(record),
+Time.SYSTEM,
+ignored -> {
+});
+store.start();
+} catch (Throwable t) {
+Utils.closeQuietly(store != null ? store::stop : null, "backing 
store");

Review Comment:
   If the `KafkaBasedLog` constructor for `store` fails, won't `store` be null, 
causing `consumer` to be leaked?



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -76,26 +112,26 @@ OptionalLong translateDownstream(TopicPartition 
sourceTopicPartition, long upstr
 }
 
 // poll and handle records
-synchronized void update(Duration pollTimeout) {
+synchronized void update(Duration pollTimeout) throws TimeoutException {
 try {
-consumer.poll(pollTimeout).forEach(this::handleRecord);
-} catch (WakeupException e) {
+backingStore.readToEnd().get(pollTimeout.toMillis(), 
TimeUnit.MILLISECONDS);
+} catch (WakeupException | InterruptedException | ExecutionException 
e) {
 // swallow
 }
 }
 
 public synchronized void close() {
-consumer.wakeup();
-Utils.closeQuietly(consumer, "offset sync store consumer");
+Utils.closeQuietly(backingStore::stop, "offset sync store kafka based 
log");

Review Comment:
   Possible NPE:
   ```suggestion
   Utils.closeQuietly(backingStore != null ? backingStore::stop : null, 
"offset sync store kafka based log");
   ```



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -16,40 +16,76 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;