[ https://issues.apache.org/jira/browse/KAFKA-9999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Boyang Chen resolved KAFKA-9999. -------------------------------- Resolution: Won't Fix > Internal topic creation failure should be non-fatal and trigger explicit > rebalance > ----------------------------------------------------------------------------------- > > Key: KAFKA-9999 > URL: https://issues.apache.org/jira/browse/KAFKA-9999 > Project: Kafka > Issue Type: Bug > Components: admin, streams > Affects Versions: 2.4.0 > Reporter: Boyang Chen > Assignee: Boyang Chen > Priority: Major > > We spotted a case in system test failure where the topic already exists but > the admin client still attempts to recreate it: > > {code:java} > [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic > SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog. Topic is probably > marked for deletion (number of partitions is unknown). > Will retry to create this topic in 100 ms (to let broker finish async delete > operation first). > Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic > 'SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog' already exists. > (org.apache.kafka.streams.processor.internals.InternalTopicManager) > [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic > SmokeTest-uwin-cnt-changelog. Topic is probably marked for deletion (number > of partitions is unknown). > Will retry to create this topic in 100 ms (to let broker finish async delete > operation first). > Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic > 'SmokeTest-uwin-cnt-changelog' already exists. > (org.apache.kafka.streams.processor.internals.InternalTopicManager) > [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic > SmokeTest-cntByCnt-changelog. Topic is probably marked for deletion (number > of partitions is unknown). > Will retry to create this topic in 100 ms (to let broker finish async delete > operation first). > Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic > 'SmokeTest-cntByCnt-changelog' already exists. > (org.apache.kafka.streams.processor.internals.InternalTopicManager) > [2020-05-14 09:56:40,120] INFO stream-thread [main] Topics > [SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog, > SmokeTest-uwin-cnt-changelog, SmokeTest-cntByCnt-changelog] can not be made > ready with 5 retries left > (org.apache.kafka.streams.processor.internals.InternalTopicManager) > [2020-05-14 09:56:40,220] ERROR stream-thread [main] Could not create topics > after 5 retries. This can happen if the Kafka cluster is temporary not > available. You can increase admin client config `retries` to be resilient > against this error. > (org.apache.kafka.streams.processor.internals.InternalTopicManager) > [2020-05-14 09:56:40,221] ERROR stream-thread > [SmokeTest-05374457-074b-4d33-bca0-8686465e8157-StreamThread-2] Encountered > the following unexpected Kafka exception during processing, this usually > indicate Streams internal errors: > (org.apache.kafka.streams.processor.internals.StreamThread) > org.apache.kafka.streams.errors.StreamsException: Could not create topics > after 5 retries. This can happen if the Kafka cluster is temporary not > available. You can increase admin client config `retries` to be resilient > against this error. > at > org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:171) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1229) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:588) > > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650) > > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) > > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743){code} > Looking closer it seems that we don't know as of today if a topic is pending > deletion or running properly. We could discuss a follow-up effort to reflect > that information as part of topic description result. > The current solution to this problem is to explicitly trigger a rebalance > when we run out of retries to unblock the group, as the short term > unavailability should be more likely a broker side unavailability. > -- This message was sent by Atlassian Jira (v8.3.4#803005)