Hello, Thanks for your reply.
I use Kafka & KafkaStream version 0.10.2.0. Between the runs, the number of partitions are not intentionally changed programmatically or manually. This topic: "external-batch-request-store-repartition" is an internally generated topic from this KafkaStream DSL "aggregate" https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KGroupedStream.html#aggregate(org.apache.kafka.streams.kstream.Initializer,%20org.apache.kafka.streams.kstream.Aggregator,%20org.apache.kafka.streams.kstream.Windows,%20org.apache.kafka.common.serialization.Serde,%20java.lang.String) I use this API as follows: ... .groupByKey() .aggregate(...) .toStream(...); Please let me know if you need addiotional information. Thanks, 2017-06-27 11:39 GMT+02:00 Eno Thereska <eno.there...@gmail.com>: > Hi there, > > Thanks for the report. What version of Kafka are you using? Also, between > runs do you change the number of partitions for your topics? I’m trying to > figure out how this problem happens, any information on what is changing in > between runs is appreciated. > > Thanks, > Eno > > > On Jun 27, 2017, at 8:52 AM, D Stephan <kafkastre...@gmail.com> wrote: > > > > Hello, > > > > When I use KafkaStreams DSL GroupByKey and Aggregate APIs, I have > randomly > > & frequently below exceptions: > > In my opinion, it is not practical to clean up the invalid partitions > > everydays. For your information, this partition is an internal partition > > that automatically created by KafkaStream Aggregate API. > > Dou you have any idea or workarounds to mitigate this exception? > > > > > > > > > > 2017-06-21T06:48:31.488210812Z 2017-06-21 06:48:31.487 WARN 1 --- [ > > StreamThread-4] o.a.k.s.p.i.InternalTopicManager : > > Could not create internal topics: Existing internal topic > > external-batch-request-store-repartition has invalid partitions. > > Expected: 20 Actual: 1. Use 'kafka.tools.StreamsResetter' tool to clean > up > > invalid topics before processing. Retry #4 > > > > 2017-06-21T06:48:31.491071442Z Exception in thread "StreamThread-4" > > org.apache.kafka.streams.errors.StreamsException: Could not create > internal > > topics. > > 2017-06-21T06:48:31.491087557Z at > > org.apache.kafka.streams.processor.internals.InternalTopicManager. > makeReady(InternalTopicManager.java:70) > > 2017-06-21T06:48:31.491091661Z at > > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor. > prepareTopic(StreamPartitionAssignor.java:618) > > 2017-06-21T06:48:31.491096794Z at > > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor. > assign(StreamPartitionAssignor.java:372) > > 2017-06-21T06:48:31.491368662Z at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > performAssignment(ConsumerCoordinator.java:339) > > 2017-06-21T06:48:31.491390576Z at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > onJoinLeader(AbstractCoordinator.java:488) > > 2017-06-21T06:48:31.491397476Z at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$ > 1100(AbstractCoordinator.java:89) > > 2017-06-21T06:48:31.491403757Z at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$ > JoinGroupResponseHandler.handle(AbstractCoordinator.java:438) > > 2017-06-21T06:48:31.491408328Z at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$ > JoinGroupResponseHandler.handle(AbstractCoordinator.java:420) > > 2017-06-21T06:48:31.491413053Z at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$ > CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:764) > >