Hi there, Thanks for the report. What version of Kafka are you using? Also, between runs do you change the number of partitions for your topics? I’m trying to figure out how this problem happens, any information on what is changing in between runs is appreciated.
Thanks, Eno > On Jun 27, 2017, at 8:52 AM, D Stephan <kafkastre...@gmail.com> wrote: > > Hello, > > When I use KafkaStreams DSL GroupByKey and Aggregate APIs, I have randomly > & frequently below exceptions: > In my opinion, it is not practical to clean up the invalid partitions > everydays. For your information, this partition is an internal partition > that automatically created by KafkaStream Aggregate API. > Dou you have any idea or workarounds to mitigate this exception? > > > > > 2017-06-21T06:48:31.488210812Z 2017-06-21 06:48:31.487 WARN 1 --- [ > StreamThread-4] o.a.k.s.p.i.InternalTopicManager : > Could not create internal topics: Existing internal topic > external-batch-request-store-repartition has invalid partitions. > Expected: 20 Actual: 1. Use 'kafka.tools.StreamsResetter' tool to clean up > invalid topics before processing. Retry #4 > > 2017-06-21T06:48:31.491071442Z Exception in thread "StreamThread-4" > org.apache.kafka.streams.errors.StreamsException: Could not create internal > topics. > 2017-06-21T06:48:31.491087557Z at > org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:70) > 2017-06-21T06:48:31.491091661Z at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.prepareTopic(StreamPartitionAssignor.java:618) > 2017-06-21T06:48:31.491096794Z at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:372) > 2017-06-21T06:48:31.491368662Z at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:339) > 2017-06-21T06:48:31.491390576Z at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:488) > 2017-06-21T06:48:31.491397476Z at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:89) > 2017-06-21T06:48:31.491403757Z at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:438) > 2017-06-21T06:48:31.491408328Z at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:420) > 2017-06-21T06:48:31.491413053Z at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:764)