Thanks. I believe we’ve addressed this issue in 0.10.2.1, any chance you could 
try that?

Thanks
Eno
> On Jun 27, 2017, at 11:14 AM, D Stephan <kafkastre...@gmail.com> wrote:
> 
> Hello,
> 
> Thanks for your reply.
> 
> I use Kafka & KafkaStream version 0.10.2.0.
> Between the runs, the number of partitions are not intentionally changed
> programmatically or manually.
> 
> This topic:  "external-batch-request-store-repartition" is an internally
> generated topic from this KafkaStream DSL
> "aggregate"
> https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KGroupedStream.html#aggregate(org.apache.kafka.streams.kstream.Initializer,%20org.apache.kafka.streams.kstream.Aggregator,%20org.apache.kafka.streams.kstream.Windows,%20org.apache.kafka.common.serialization.Serde,%20java.lang.String)
> 
> 
> 
> I use this API as follows:
> 
> ...
> .groupByKey()
> .aggregate(...)
> .toStream(...);
> 
> 
> Please let me know if you need addiotional information.
> 
> Thanks,
> 
> 
> 2017-06-27 11:39 GMT+02:00 Eno Thereska <eno.there...@gmail.com>:
> 
>> Hi there,
>> 
>> Thanks for the report. What version of Kafka are you using? Also, between
>> runs do you change the number of partitions for your topics? I’m trying to
>> figure out how this problem happens, any information on what is changing in
>> between runs is appreciated.
>> 
>> Thanks,
>> Eno
>> 
>>> On Jun 27, 2017, at 8:52 AM, D Stephan <kafkastre...@gmail.com> wrote:
>>> 
>>> Hello,
>>> 
>>> When I use KafkaStreams DSL GroupByKey and Aggregate APIs, I have
>> randomly
>>> & frequently below exceptions:
>>> In my opinion, it is not practical to clean up the invalid partitions
>>> everydays.  For your information, this partition is an internal partition
>>> that automatically created by KafkaStream Aggregate API.
>>> Dou you have any idea or workarounds to mitigate this exception?
>>> 
>>> 
>>> 
>>> 
>>> 2017-06-21T06:48:31.488210812Z 2017-06-21 06:48:31.487 WARN 1 --- [
>>> StreamThread-4] o.a.k.s.p.i.InternalTopicManager :
>>> Could not create internal topics: Existing internal topic
>>> external-batch-request-store-repartition has invalid partitions.
>>> Expected: 20 Actual: 1. Use 'kafka.tools.StreamsResetter' tool to clean
>> up
>>> invalid topics before processing. Retry #4
>>> 
>>> 2017-06-21T06:48:31.491071442Z Exception in thread "StreamThread-4"
>>> org.apache.kafka.streams.errors.StreamsException: Could not create
>> internal
>>> topics.
>>> 2017-06-21T06:48:31.491087557Z at
>>> org.apache.kafka.streams.processor.internals.InternalTopicManager.
>> makeReady(InternalTopicManager.java:70)
>>> 2017-06-21T06:48:31.491091661Z at
>>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.
>> prepareTopic(StreamPartitionAssignor.java:618)
>>> 2017-06-21T06:48:31.491096794Z at
>>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.
>> assign(StreamPartitionAssignor.java:372)
>>> 2017-06-21T06:48:31.491368662Z at
>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
>> performAssignment(ConsumerCoordinator.java:339)
>>> 2017-06-21T06:48:31.491390576Z at
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> onJoinLeader(AbstractCoordinator.java:488)
>>> 2017-06-21T06:48:31.491397476Z at
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$
>> 1100(AbstractCoordinator.java:89)
>>> 2017-06-21T06:48:31.491403757Z at
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
>> JoinGroupResponseHandler.handle(AbstractCoordinator.java:438)
>>> 2017-06-21T06:48:31.491408328Z at
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
>> JoinGroupResponseHandler.handle(AbstractCoordinator.java:420)
>>> 2017-06-21T06:48:31.491413053Z at
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
>> CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:764)
>> 
>> 

Reply via email to