Hi, Could you check if this helps: https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition <https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition>
Thanks Eno > On Aug 4, 2017, at 12:48 PM, Anish Mashankar <an...@systeminsights.com> wrote: > > Hello Eno, > Thanks for considering the question. > > How I am creating the state stores: > > StateStoreSupplier stateStoreSupplier = > StateStorStores.create("testing-2-store").withKeys(keySerde).withValues(valueSerde).persistent().build(); > TopologyBuilder builder = ... > builder.addStateStore(stateStoreSupplier, "ProcessorUsingStateStore"); > > The Error Message with stack trace is as follows: > > 2017-08-04 17:11:23,184 53205 > [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] INFO > o.a.k.s.p.internals.StreamThread - stream-thread > [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] Created > active task -727063541_0 with assigned partitions [testing-topic-0] > > 2017-08-04 17:11:23,185 53206 > [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] INFO > o.a.k.s.p.internals.StreamThread - stream-thread > [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] partition > assignment took 41778 ms. > current active tasks: [] > current standby tasks: [] > > 2017-08-04 17:11:23,187 53208 > [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] ERROR > o.a.k.c.c.i.ConsumerCoordinator - User provided listener > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener > for group testing-2 failed on partition assignment > org.apache.kafka.streams.errors.StreamsException: Store testing-2-store's > change log (testing-2-testing-2-store-changelog) does not contain partition > 0 > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:87) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:165) > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:100) > at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:177) > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40) > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.init(ChangeLoggingKeyValueStore.java:57) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:99) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:130) > at > org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:201) > at > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:140) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73) > at > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) > > I hope this shares more light on the situation. > Thanks > > On Fri, Aug 4, 2017 at 2:28 PM Eno Thereska <eno.there...@gmail.com > <mailto:eno.there...@gmail.com>> wrote: > >> Hi Anish, >> >> Could you give more info on how you create the state stores in your code? >> Also could you copy-paste the exact error message from the log? >> >> Thanks >> Eno >>> On Aug 4, 2017, at 9:05 AM, Anish Mashankar <an...@systeminsights.com> >> wrote: >>> >>> I have a new application, call it streamsApp with state stores S1 and S2. >>> So, according to the documentation, upon the first time startup, the >>> application should've created the changelog topics >> streamsApp-S1-changelog >>> and streamsApp-S2-changelog. But I see that these topics are not created. >>> Also, the application throws an error that it couldn't find any partition >>> for topics *streamsApp-S1-changelog and streamsApp-S2-changelog *and then >>> exits*. *To get it working, I manually created the topics, but I am >>> skeptical because the docs say that this convention might change any >> time. >>> I am using Kafka Streams v0.11, with a Kafka Broker v0.11, but message >>> protocol set to v0.10.0. Am I missing something? >>> -- >>> >>> Regards, >>> Anish Samir Mashankar >>> R&D Engineer >>> System Insights >>> +91-9789870733 <+91%2097898%2070733> >> >> -- > > Regards, > Anish Samir Mashankar > R&D Engineer > System Insights > +91-9789870733