
Gilles Degols commented on KAFKA-6014:

I would go for option 1, using a specific Exception to handle things correctly. 
Is there a problem to use the InvalidTopicException or do you think a more 
descriptive Exception would be needed? 

> new consumer mirror maker halts after committing offsets to a deleted topic
> ---------------------------------------------------------------------------
>                 Key: KAFKA-6014
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6014
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Onur Karaman
> New consumer throws an unexpected KafkaException when trying to commit to a 
> topic that has been deleted. MirrorMaker.commitOffsets doesn't attempt to 
> catch the KafkaException and just kills the process. We didn't see this in 
> the old consumer because old consumer just silently drops failed offset 
> commits.
> I ran a quick experiment locally to prove the behavior. The experiment:
> 1. start up a single broker
> 2. create a single-partition topic t
> 3. create a new consumer that consumes topic t
> 4. make the consumer commit every few seconds
> 5. delete topic t
> 6. expect: KafkaException that kills the process.
> Here's my script:
> {code}
> package org.apache.kafka.clients.consumer;
> import org.apache.kafka.common.TopicPartition;
> import java.util.Collections;
> import java.util.List;
> import java.util.Properties;
> public class OffsetCommitTopicDeletionTest {
>     public static void main(String[] args) throws InterruptedException {
>         Properties props = new Properties();
>         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9090");
>         props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g");
>         props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
>         props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
>         props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
>         KafkaConsumer<byte[], byte[]> kafkaConsumer = new 
> KafkaConsumer<>(props);
>         TopicPartition partition = new TopicPartition("t", 0);
>         List<TopicPartition> partitions = 
> Collections.singletonList(partition);
>         kafkaConsumer.assign(partitions);
>         while (true) {
>             kafkaConsumer.commitSync(Collections.singletonMap(partition, new 
> OffsetAndMetadata(0, "")));
>             Thread.sleep(1000);
>         }
>     }
> }
> {code}
> Here are the other commands:
> {code}
> > rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs*
> > ./gradlew clean jar
> > ./bin/zookeeper-server-start.sh config/zookeeper.properties
> > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh 
> > config/server0.properties
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t 
> > --partitions 1 --replication-factor 1
> > ./bin/kafka-run-class.sh 
> > org.apache.kafka.clients.consumer.OffsetCommitTopicDeletionTest
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic t
> {code}
> Here is the output:
> {code}
> [2017-10-04 20:00:14,451] ERROR [Consumer clientId=consumer-1, groupId=g] 
> Offset commit failed on partition t-0 at offset 0: This server does not host 
> this topic-partition. 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> Exception in thread "main" org.apache.kafka.common.KafkaException: Partition 
> t-0 may not exist or user may not have Describe access to topic
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:789)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:734)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:190)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1231)
>   at 
> org.apache.kafka.clients.consumer.OffsetCommitTopicDeletionTest.main(OffsetCommitTopicDeletionTest.java:22)
> {code}
> A couple ways we could fix this:
> 1. make OffsetCommitResponseHandler throw a more specific exception and make 
> MirrorMaker.commitOffsets catch the exception. It currently just catches 
> WakeupException and CommitFailedException.
> 2. make OffsetCommitResponseHandler log the error and move on. This is 
> probably the simpler option. Just delete lines:
> {code}
> -                        future.raise(new KafkaException("Partition " + tp + 
> " may not exist or user may not have Describe access to topic"));
> -                        return;
> {code}

This message was sent by Atlassian JIRA

Reply via email to