[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag
C0urante commented on code in PR #13178: URL: https://github.com/apache/kafka/pull/13178#discussion_r1107352681 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -701,43 +745,151 @@ protected void produceMessages(EmbeddedConnectCluster cluster, String topicName, int cnt = 0; for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++) for (int p = 0; p < numPartitions; p++) -cluster.kafka().produce(topicName, p, "key", "value-" + cnt++); +produce(cluster.kafka(), topicName, p, "key", "value-" + cnt++); } - + +/** + * Produce a test record to a Kafka cluster. + * This method allows subclasses to configure and use their own Kafka Producer instead of using the built-in default. + * @param cluster Kafka cluster that should receive the record + * @param topic Topic to send the record to, non-null + * @param partition Partition to send the record to, maybe null. + * @param key Kafka key for the record + * @param value Kafka value for the record + */ +protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer partition, String key, String value) { +cluster.produce(topic, partition, key, value); +} + +protected static Map waitForCheckpointOnAllPartitions( +MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName +) throws InterruptedException { +AtomicReference> ret = new AtomicReference<>(); +waitForCondition( +() -> { +Map offsets = client.remoteConsumerOffsets( +consumerGroupName, remoteClusterAlias, Duration.ofMillis(3000)); +for (int i = 0; i < NUM_PARTITIONS; i++) { +if (!offsets.containsKey(new TopicPartition(topicName, i))) { +log.info("Checkpoint is missing for {}: {}-{}", consumerGroupName, topicName, i); +return false; +} +} +ret.set(offsets); +return true; +}, +CHECKPOINT_DURATION_MS, +String.format( +"Offsets for consumer group %s not translated from %s for topic %s", +consumerGroupName, +remoteClusterAlias, +topicName +) +); +return ret.get(); +} + /* * given consumer group, topics and expected number of records, make sure the consumer group * offsets are eventually synced to the expected offset numbers */ -protected static void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect, -Consumer consumer, List topics, String consumerGroupId, int numRecords, boolean exactOffsetTranslation) -throws InterruptedException { +protected static void waitForConsumerGroupFullSync( +EmbeddedConnectCluster connect, List topics, String consumerGroupId, int numRecords, boolean exactOffsetTranslation +) throws InterruptedException { try (Admin adminClient = connect.kafka().createAdminClient()) { -List tps = new ArrayList<>(NUM_PARTITIONS * topics.size()); +Map tps = new HashMap<>(NUM_PARTITIONS * topics.size()); for (int partitionIndex = 0; partitionIndex < NUM_PARTITIONS; partitionIndex++) { for (String topic : topics) { -tps.add(new TopicPartition(topic, partitionIndex)); +tps.put(new TopicPartition(topic, partitionIndex), OffsetSpec.latest()); } } long expectedTotalOffsets = numRecords * topics.size(); waitForCondition(() -> { Map consumerGroupOffsets = adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get(); -long consumerGroupOffsetTotal = consumerGroupOffsets.values().stream() +long totalConsumerGroupOffsets = consumerGroupOffsets.values().stream() .mapToLong(OffsetAndMetadata::offset).sum(); -Map offsets = consumer.endOffsets(tps, CONSUMER_POLL_TIMEOUT_MS); -long totalOffsets = offsets.values().stream().mapToLong(l -> l).sum(); - +Map endOffsets = +adminClient.listOffsets(tps).all().get(); +long totalEndOffsets = endOffsets.values().stream() + .mapToLong(ListOffsetsResult.ListOffsetsResultInfo::offset).sum(); + +for (TopicPartition tp : endOffsets.keySet()) { +if (consumerGroupOffsets.containsKey(tp)) { +assertTrue(consumerGroup
[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag
C0urante commented on code in PR #13178: URL: https://github.com/apache/kafka/pull/13178#discussion_r1107345427 ## connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java: ## @@ -392,6 +400,17 @@ protected Consumer createConsumer() { return new KafkaConsumer<>(consumerConfigs); } +/** + * Test whether a topic partition should be read by this log. + * Overridden by subclasses when only a subset of the assigned partitions should be read into memory. + * By default, this will read all partitions. Review Comment: Some nits: ```suggestion * Signals whether a topic partition should be read by this log. Invoked on {@link #start() startup} once * for every partition found in the log's backing topic. * This method can be overridden by subclasses when only a subset of the assigned partitions * should be read into memory. By default, all partitions are read. ``` ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -701,43 +745,151 @@ protected void produceMessages(EmbeddedConnectCluster cluster, String topicName, int cnt = 0; for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++) for (int p = 0; p < numPartitions; p++) -cluster.kafka().produce(topicName, p, "key", "value-" + cnt++); +produce(cluster.kafka(), topicName, p, "key", "value-" + cnt++); } - + +/** + * Produce a test record to a Kafka cluster. + * This method allows subclasses to configure and use their own Kafka Producer instead of using the built-in default. + * @param cluster Kafka cluster that should receive the record + * @param topic Topic to send the record to, non-null + * @param partition Partition to send the record to, maybe null. + * @param key Kafka key for the record + * @param value Kafka value for the record + */ +protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer partition, String key, String value) { +cluster.produce(topic, partition, key, value); +} + +protected static Map waitForCheckpointOnAllPartitions( +MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName +) throws InterruptedException { +AtomicReference> ret = new AtomicReference<>(); +waitForCondition( +() -> { +Map offsets = client.remoteConsumerOffsets( +consumerGroupName, remoteClusterAlias, Duration.ofMillis(3000)); +for (int i = 0; i < NUM_PARTITIONS; i++) { +if (!offsets.containsKey(new TopicPartition(topicName, i))) { +log.info("Checkpoint is missing for {}: {}-{}", consumerGroupName, topicName, i); +return false; +} +} +ret.set(offsets); +return true; +}, +CHECKPOINT_DURATION_MS, +String.format( +"Offsets for consumer group %s not translated from %s for topic %s", +consumerGroupName, +remoteClusterAlias, +topicName +) +); +return ret.get(); +} + /* * given consumer group, topics and expected number of records, make sure the consumer group * offsets are eventually synced to the expected offset numbers */ -protected static void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect, -Consumer consumer, List topics, String consumerGroupId, int numRecords, boolean exactOffsetTranslation) -throws InterruptedException { +protected static void waitForConsumerGroupFullSync( +EmbeddedConnectCluster connect, List topics, String consumerGroupId, int numRecords, boolean exactOffsetTranslation +) throws InterruptedException { try (Admin adminClient = connect.kafka().createAdminClient()) { -List tps = new ArrayList<>(NUM_PARTITIONS * topics.size()); +Map tps = new HashMap<>(NUM_PARTITIONS * topics.size()); for (int partitionIndex = 0; partitionIndex < NUM_PARTITIONS; partitionIndex++) { for (String topic : topics) { -tps.add(new TopicPartition(topic, partitionIndex)); +tps.put(new TopicPartition(topic, partitionIndex), OffsetSpec.latest()); } } long expectedTotalOffsets = numRecords * topics.size(); waitForCondition(() -> { Map consumerGroupOffsets = adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata()
[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag
C0urante commented on code in PR #13178: URL: https://github.com/apache/kafka/pull/13178#discussion_r1107275404 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ## @@ -134,9 +138,9 @@ public String version() { @Override public List poll() throws InterruptedException { try { -long deadline = System.currentTimeMillis() + interval.toMillis(); -while (!stopping && System.currentTimeMillis() < deadline) { -offsetSyncStore.update(pollTimeout); +if (stopping.await(pollTimeout.toMillis(), TimeUnit.MILLISECONDS)) { Review Comment: Ah yeah, totally right, the condition was correct. Sorry about that! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag
C0urante commented on code in PR #13178: URL: https://github.com/apache/kafka/pull/13178#discussion_r1107273995 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -16,40 +16,88 @@ */ package org.apache.kafka.connect.mirror; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.TopicAdmin; -import java.util.Map; -import java.util.HashMap; import java.util.Collections; -import java.time.Duration; +import java.util.Map; import java.util.Optional; import java.util.OptionalLong; +import java.util.concurrent.ConcurrentHashMap; /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */ class OffsetSyncStore implements AutoCloseable { -private final KafkaConsumer consumer; -private final Map offsetSyncs = new HashMap<>(); -private final TopicPartition offsetSyncTopicPartition; +private final KafkaBasedLog backingStore; +private final Map offsetSyncs = new ConcurrentHashMap<>(); +private final TopicAdmin admin; +private volatile boolean readToEnd = false; OffsetSyncStore(MirrorCheckpointConfig config) { -consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(), -new ByteArrayDeserializer(), new ByteArrayDeserializer()); -offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0); -consumer.assign(Collections.singleton(offsetSyncTopicPartition)); +Consumer consumer = null; +TopicAdmin admin = null; +KafkaBasedLog store; +try { +Consumer finalConsumer = consumer = MirrorUtils.newConsumer(config.offsetSyncsTopicConsumerConfig()); Review Comment: Ah yeah, much cleaner. Thanks! 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag
C0urante commented on code in PR #13178: URL: https://github.com/apache/kafka/pull/13178#discussion_r1106023715 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ## @@ -134,9 +138,9 @@ public String version() { @Override public List poll() throws InterruptedException { try { -long deadline = System.currentTimeMillis() + interval.toMillis(); -while (!stopping && System.currentTimeMillis() < deadline) { -offsetSyncStore.update(pollTimeout); +if (stopping.await(pollTimeout.toMillis(), TimeUnit.MILLISECONDS)) { Review Comment: Also, we changed the contract for `SourceTask::stop` a while ago to not be invoked on a separate thread from the one that invokes `SourceTask::poll` in order to fix KAFKA-10792, so I'm not sure how much value changing this to a `CountDownLatch` really adds here. Unless there's something else I'm missing here that necessitates this change, I think it's fine to leave `stopping` as a boolean field. As a follow-up, we could potentially tweak the logic here to pause for, e.g., a second and then return a `null` record batch from `SourceTask::poll` if the poll timeout hasn't elapsed since the last time we returned a non-null batch. But that should not happen in this PR, we're doing plenty here already. ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -75,18 +123,9 @@ OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstr } } -// poll and handle records -synchronized void update(Duration pollTimeout) { -try { -consumer.poll(pollTimeout).forEach(this::handleRecord); -} catch (WakeupException e) { -// swallow -} -} - -public synchronized void close() { -consumer.wakeup(); -Utils.closeQuietly(consumer, "offset sync store consumer"); +public void close() { Review Comment: Nit: ```suggestion @Override public void close() { ``` ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ## @@ -134,9 +138,9 @@ public String version() { @Override public List poll() throws InterruptedException { try { -long deadline = System.currentTimeMillis() + interval.toMillis(); -while (!stopping && System.currentTimeMillis() < deadline) { -offsetSyncStore.update(pollTimeout); +if (stopping.await(pollTimeout.toMillis(), TimeUnit.MILLISECONDS)) { Review Comment: Should this condition be inverted? ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -16,40 +16,88 @@ */ package org.apache.kafka.connect.mirror; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.TopicAdmin; -import java.util.Map; -import java.util.HashMap; import java.util.Collections; -import java.time.Duration; +import java.util.Map; import java.util.Optional; import java.util.OptionalLong; +import java.util.concurrent.ConcurrentHashMap; /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */ class OffsetSyncStore implements AutoCloseable { -private final KafkaConsumer consumer; -private final Map offsetSyncs = new HashMap<>(); -private final TopicPartition offsetSyncTopicPartition; +private final KafkaBasedLog backingStore; +private final Map offsetSyncs = new ConcurrentHashMap<>(); +private final TopicAdmin admin; +private volatile boolean readToEnd = false; OffsetSyncStore(MirrorCheckpointConfig config) { -consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(), -new ByteArrayDeserializer(), new ByteArrayDeserializer()); -offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0); -consumer.assign(Collections.singleton(offsetSyncTopicPartition)); +Consumer consumer = null; +TopicAdmin admin = null; +KafkaBasedLog store; +try { +Consumer finalConsumer = consumer = MirrorUtils.newConsumer(config.offsetSyncsTopicConsumerConfig()); Review Comment: This is fairly unclean but I can't think of a significantly better alternative... guess it's
[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag
C0urante commented on code in PR #13178: URL: https://github.com/apache/kafka/pull/13178#discussion_r1104643146 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -16,40 +16,76 @@ */ package org.apache.kafka.connect.mirror; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.TopicAdmin; import java.util.Map; import java.util.HashMap; -import java.util.Collections; import java.time.Duration; import java.util.Optional; import java.util.OptionalLong; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */ class OffsetSyncStore implements AutoCloseable { -private final KafkaConsumer consumer; +private final KafkaBasedLog backingStore; private final Map offsetSyncs = new HashMap<>(); -private final TopicPartition offsetSyncTopicPartition; +private final TopicAdmin admin; OffsetSyncStore(MirrorCheckpointConfig config) { -consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(), -new ByteArrayDeserializer(), new ByteArrayDeserializer()); -offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0); Review Comment: This leads to a change in behavior since we'll end up consuming from all partitions in the offset syncs topic instead of just partition 0. We intentionally [write every offset sync to partition zero](https://github.com/apache/kafka/blob/8cfafba2794562840b0f1c537e304f084b9359cf/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L249) and [create the topic with a single partition](https://github.com/apache/kafka/blob/8cfafba2794562840b0f1c537e304f084b9359cf/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L361), but the topic may have been created out-of-band and there may be other information in it which has not been produced by MM2 that we shouldn't consume. Could we expand the `KafkaBasedLog` API to support reading from a specific subset of the partitions for a topic, possibly by adding a `protected List assignedPartitions(List partitionInfos)` method that can be overridden by subclasses? This would allow us to completely preserve the existing behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag
C0urante commented on code in PR #13178: URL: https://github.com/apache/kafka/pull/13178#discussion_r1104664513 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -16,40 +16,76 @@ */ package org.apache.kafka.connect.mirror; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.TopicAdmin; import java.util.Map; import java.util.HashMap; -import java.util.Collections; import java.time.Duration; import java.util.Optional; import java.util.OptionalLong; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */ class OffsetSyncStore implements AutoCloseable { -private final KafkaConsumer consumer; +private final KafkaBasedLog backingStore; private final Map offsetSyncs = new HashMap<>(); -private final TopicPartition offsetSyncTopicPartition; +private final TopicAdmin admin; OffsetSyncStore(MirrorCheckpointConfig config) { -consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(), -new ByteArrayDeserializer(), new ByteArrayDeserializer()); -offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0); -consumer.assign(Collections.singleton(offsetSyncTopicPartition)); +String topic = config.offsetSyncsTopic(); +Consumer consumer = new KafkaConsumer<>( +config.offsetSyncsTopicConsumerConfig(), +new ByteArrayDeserializer(), +new ByteArrayDeserializer()); +this.admin = new TopicAdmin( + config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), +config.forwardingAdmin(config.offsetSyncsTopicAdminConfig())); +KafkaBasedLog store = null; +try { +store = KafkaBasedLog.withExistingClients( +topic, +consumer, +null, +new TopicAdmin( + config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), + config.forwardingAdmin(config.offsetSyncsTopicAdminConfig())), +(error, record) -> this.handleRecord(record), +Time.SYSTEM, +ignored -> { +}); +store.start(); +} catch (Throwable t) { +Utils.closeQuietly(store != null ? store::stop : null, "backing store"); Review Comment: If the `KafkaBasedLog` constructor for `store` fails, won't `store` be null, causing `consumer` to be leaked? ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -76,26 +112,26 @@ OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstr } // poll and handle records -synchronized void update(Duration pollTimeout) { +synchronized void update(Duration pollTimeout) throws TimeoutException { try { -consumer.poll(pollTimeout).forEach(this::handleRecord); -} catch (WakeupException e) { +backingStore.readToEnd().get(pollTimeout.toMillis(), TimeUnit.MILLISECONDS); +} catch (WakeupException | InterruptedException | ExecutionException e) { // swallow } } public synchronized void close() { -consumer.wakeup(); -Utils.closeQuietly(consumer, "offset sync store consumer"); +Utils.closeQuietly(backingStore::stop, "offset sync store kafka based log"); Review Comment: Possible NPE: ```suggestion Utils.closeQuietly(backingStore != null ? backingStore::stop : null, "offset sync store kafka based log"); ``` ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -16,40 +16,76 @@ */ package org.apache.kafka.connect.mirror; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.ByteArrayDeserializer;