[ https://issues.apache.org/jira/browse/KAFKA-6014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax reopened KAFKA-6014: ------------------------------------ > new consumer mirror maker halts after committing offsets to a deleted topic > --------------------------------------------------------------------------- > > Key: KAFKA-6014 > URL: https://issues.apache.org/jira/browse/KAFKA-6014 > Project: Kafka > Issue Type: Bug > Reporter: Onur Karaman > Assignee: Jason Gustafson > Priority: Major > > New consumer throws an unexpected KafkaException when trying to commit to a > topic that has been deleted. MirrorMaker.commitOffsets doesn't attempt to > catch the KafkaException and just kills the process. We didn't see this in > the old consumer because old consumer just silently drops failed offset > commits. > I ran a quick experiment locally to prove the behavior. The experiment: > 1. start up a single broker > 2. create a single-partition topic t > 3. create a new consumer that consumes topic t > 4. make the consumer commit every few seconds > 5. delete topic t > 6. expect: KafkaException that kills the process. > Here's my script: > {code} > package org.apache.kafka.clients.consumer; > import org.apache.kafka.common.TopicPartition; > import java.util.Collections; > import java.util.List; > import java.util.Properties; > public class OffsetCommitTopicDeletionTest { > public static void main(String[] args) throws InterruptedException { > Properties props = new Properties(); > props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9090"); > props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g"); > props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.ByteArrayDeserializer"); > props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.ByteArrayDeserializer"); > props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); > KafkaConsumer<byte[], byte[]> kafkaConsumer = new > KafkaConsumer<>(props); > TopicPartition partition = new TopicPartition("t", 0); > List<TopicPartition> partitions = > Collections.singletonList(partition); > kafkaConsumer.assign(partitions); > while (true) { > kafkaConsumer.commitSync(Collections.singletonMap(partition, new > OffsetAndMetadata(0, ""))); > Thread.sleep(1000); > } > } > } > {code} > Here are the other commands: > {code} > > rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs* > > ./gradlew clean jar > > ./bin/zookeeper-server-start.sh config/zookeeper.properties > > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh > > config/server0.properties > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t > > --partitions 1 --replication-factor 1 > > ./bin/kafka-run-class.sh > > org.apache.kafka.clients.consumer.OffsetCommitTopicDeletionTest > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic t > {code} > Here is the output: > {code} > [2017-10-04 20:00:14,451] ERROR [Consumer clientId=consumer-1, groupId=g] > Offset commit failed on partition t-0 at offset 0: This server does not host > this topic-partition. > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) > Exception in thread "main" org.apache.kafka.common.KafkaException: Partition > t-0 may not exist or user may not have Describe access to topic > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:789) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:734) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:190) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1231) > at > org.apache.kafka.clients.consumer.OffsetCommitTopicDeletionTest.main(OffsetCommitTopicDeletionTest.java:22) > {code} > A couple ways we could fix this: > 1. make OffsetCommitResponseHandler throw a more specific exception and make > MirrorMaker.commitOffsets catch the exception. It currently just catches > WakeupException and CommitFailedException. > 2. make OffsetCommitResponseHandler log the error and move on. This is > probably the simpler option. Just delete lines: > {code} > - future.raise(new KafkaException("Partition " + tp + > " may not exist or user may not have Describe access to topic")); > - return; > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)