Hello,

Thanks for your reply.

I use Kafka & KafkaStream version 0.10.2.0.
Between the runs, the number of partitions are not intentionally changed
programmatically or manually.

This topic:  "external-batch-request-store-repartition" is an internally
generated topic from this KafkaStream DSL
"aggregate"
https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KGroupedStream.html#aggregate(org.apache.kafka.streams.kstream.Initializer,%20org.apache.kafka.streams.kstream.Aggregator,%20org.apache.kafka.streams.kstream.Windows,%20org.apache.kafka.common.serialization.Serde,%20java.lang.String)



I use this API as follows:

...
.groupByKey()
.aggregate(...)
.toStream(...);


Please let me know if you need addiotional information.

Thanks,


2017-06-27 11:39 GMT+02:00 Eno Thereska <eno.there...@gmail.com>:

> Hi there,
>
> Thanks for the report. What version of Kafka are you using? Also, between
> runs do you change the number of partitions for your topics? I’m trying to
> figure out how this problem happens, any information on what is changing in
> between runs is appreciated.
>
> Thanks,
> Eno
>
> > On Jun 27, 2017, at 8:52 AM, D Stephan <kafkastre...@gmail.com> wrote:
> >
> > Hello,
> >
> > When I use KafkaStreams DSL GroupByKey and Aggregate APIs, I have
> randomly
> > & frequently below exceptions:
> > In my opinion, it is not practical to clean up the invalid partitions
> > everydays.  For your information, this partition is an internal partition
> > that automatically created by KafkaStream Aggregate API.
> > Dou you have any idea or workarounds to mitigate this exception?
> >
> >
> >
> >
> > 2017-06-21T06:48:31.488210812Z 2017-06-21 06:48:31.487 WARN 1 --- [
> > StreamThread-4] o.a.k.s.p.i.InternalTopicManager :
> > Could not create internal topics: Existing internal topic
> > external-batch-request-store-repartition has invalid partitions.
> > Expected: 20 Actual: 1. Use 'kafka.tools.StreamsResetter' tool to clean
> up
> > invalid topics before processing. Retry #4
> >
> > 2017-06-21T06:48:31.491071442Z Exception in thread "StreamThread-4"
> > org.apache.kafka.streams.errors.StreamsException: Could not create
> internal
> > topics.
> > 2017-06-21T06:48:31.491087557Z at
> > org.apache.kafka.streams.processor.internals.InternalTopicManager.
> makeReady(InternalTopicManager.java:70)
> > 2017-06-21T06:48:31.491091661Z at
> > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.
> prepareTopic(StreamPartitionAssignor.java:618)
> > 2017-06-21T06:48:31.491096794Z at
> > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.
> assign(StreamPartitionAssignor.java:372)
> > 2017-06-21T06:48:31.491368662Z at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> performAssignment(ConsumerCoordinator.java:339)
> > 2017-06-21T06:48:31.491390576Z at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> onJoinLeader(AbstractCoordinator.java:488)
> > 2017-06-21T06:48:31.491397476Z at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$
> 1100(AbstractCoordinator.java:89)
> > 2017-06-21T06:48:31.491403757Z at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
> JoinGroupResponseHandler.handle(AbstractCoordinator.java:438)
> > 2017-06-21T06:48:31.491408328Z at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
> JoinGroupResponseHandler.handle(AbstractCoordinator.java:420)
> > 2017-06-21T06:48:31.491413053Z at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
> CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:764)
>
>

Reply via email to