Thanks. I believe we’ve addressed this issue in 0.10.2.1, any chance you could try that?
Thanks Eno > On Jun 27, 2017, at 11:14 AM, D Stephan <kafkastre...@gmail.com> wrote: > > Hello, > > Thanks for your reply. > > I use Kafka & KafkaStream version 0.10.2.0. > Between the runs, the number of partitions are not intentionally changed > programmatically or manually. > > This topic: "external-batch-request-store-repartition" is an internally > generated topic from this KafkaStream DSL > "aggregate" > https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KGroupedStream.html#aggregate(org.apache.kafka.streams.kstream.Initializer,%20org.apache.kafka.streams.kstream.Aggregator,%20org.apache.kafka.streams.kstream.Windows,%20org.apache.kafka.common.serialization.Serde,%20java.lang.String) > > > > I use this API as follows: > > ... > .groupByKey() > .aggregate(...) > .toStream(...); > > > Please let me know if you need addiotional information. > > Thanks, > > > 2017-06-27 11:39 GMT+02:00 Eno Thereska <eno.there...@gmail.com>: > >> Hi there, >> >> Thanks for the report. What version of Kafka are you using? Also, between >> runs do you change the number of partitions for your topics? I’m trying to >> figure out how this problem happens, any information on what is changing in >> between runs is appreciated. >> >> Thanks, >> Eno >> >>> On Jun 27, 2017, at 8:52 AM, D Stephan <kafkastre...@gmail.com> wrote: >>> >>> Hello, >>> >>> When I use KafkaStreams DSL GroupByKey and Aggregate APIs, I have >> randomly >>> & frequently below exceptions: >>> In my opinion, it is not practical to clean up the invalid partitions >>> everydays. For your information, this partition is an internal partition >>> that automatically created by KafkaStream Aggregate API. >>> Dou you have any idea or workarounds to mitigate this exception? >>> >>> >>> >>> >>> 2017-06-21T06:48:31.488210812Z 2017-06-21 06:48:31.487 WARN 1 --- [ >>> StreamThread-4] o.a.k.s.p.i.InternalTopicManager : >>> Could not create internal topics: Existing internal topic >>> external-batch-request-store-repartition has invalid partitions. >>> Expected: 20 Actual: 1. Use 'kafka.tools.StreamsResetter' tool to clean >> up >>> invalid topics before processing. Retry #4 >>> >>> 2017-06-21T06:48:31.491071442Z Exception in thread "StreamThread-4" >>> org.apache.kafka.streams.errors.StreamsException: Could not create >> internal >>> topics. >>> 2017-06-21T06:48:31.491087557Z at >>> org.apache.kafka.streams.processor.internals.InternalTopicManager. >> makeReady(InternalTopicManager.java:70) >>> 2017-06-21T06:48:31.491091661Z at >>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor. >> prepareTopic(StreamPartitionAssignor.java:618) >>> 2017-06-21T06:48:31.491096794Z at >>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor. >> assign(StreamPartitionAssignor.java:372) >>> 2017-06-21T06:48:31.491368662Z at >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. >> performAssignment(ConsumerCoordinator.java:339) >>> 2017-06-21T06:48:31.491390576Z at >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. >> onJoinLeader(AbstractCoordinator.java:488) >>> 2017-06-21T06:48:31.491397476Z at >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$ >> 1100(AbstractCoordinator.java:89) >>> 2017-06-21T06:48:31.491403757Z at >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$ >> JoinGroupResponseHandler.handle(AbstractCoordinator.java:438) >>> 2017-06-21T06:48:31.491408328Z at >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$ >> JoinGroupResponseHandler.handle(AbstractCoordinator.java:420) >>> 2017-06-21T06:48:31.491413053Z at >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$ >> CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:764) >> >>