Hi,

Could you check if this helps:
https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition
 
<https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition>

Thanks
Eno
> On Aug 4, 2017, at 12:48 PM, Anish Mashankar <an...@systeminsights.com> wrote:
> 
> Hello Eno,
> Thanks for considering the question.
> 
> How I am creating the state stores:
> 
> StateStoreSupplier stateStoreSupplier =
> StateStorStores.create("testing-2-store").withKeys(keySerde).withValues(valueSerde).persistent().build();
> TopologyBuilder builder = ...
> builder.addStateStore(stateStoreSupplier, "ProcessorUsingStateStore");
> 
> The Error Message with stack trace is as follows:
> 
> 2017-08-04 17:11:23,184 53205
> [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] INFO
> o.a.k.s.p.internals.StreamThread - stream-thread
> [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] Created
> active task -727063541_0 with assigned partitions [testing-topic-0]
> 
> 2017-08-04 17:11:23,185 53206
> [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] INFO
> o.a.k.s.p.internals.StreamThread - stream-thread
> [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] partition
> assignment took 41778 ms.
> current active tasks: []
> current standby tasks: []
> 
> 2017-08-04 17:11:23,187 53208
> [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] ERROR
> o.a.k.c.c.i.ConsumerCoordinator - User provided listener
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener
> for group testing-2 failed on partition assignment
> org.apache.kafka.streams.errors.StreamsException: Store testing-2-store's
> change log (testing-2-testing-2-store-changelog) does not contain partition
> 0
> at
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:87)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:165)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:100)
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:177)
> at
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40)
> at
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.init(ChangeLoggingKeyValueStore.java:57)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:99)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:130)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:201)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:140)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
> 
> I hope this shares more light on the situation.
> Thanks
> 
> On Fri, Aug 4, 2017 at 2:28 PM Eno Thereska <eno.there...@gmail.com 
> <mailto:eno.there...@gmail.com>> wrote:
> 
>> Hi Anish,
>> 
>> Could you give more info on how you create the state stores in your code?
>> Also could you copy-paste the exact error message from the log?
>> 
>> Thanks
>> Eno
>>> On Aug 4, 2017, at 9:05 AM, Anish Mashankar <an...@systeminsights.com>
>> wrote:
>>> 
>>> I have a new application, call it streamsApp with state stores S1 and S2.
>>> So, according to the documentation, upon the first time startup, the
>>> application should've created the changelog topics
>> streamsApp-S1-changelog
>>> and streamsApp-S2-changelog. But I see that these topics are not created.
>>> Also, the application throws an error that it couldn't find any partition
>>> for topics *streamsApp-S1-changelog and streamsApp-S2-changelog *and then
>>> exits*. *To get it working, I manually created the topics, but I am
>>> skeptical because the docs say that this convention might change any
>> time.
>>> I am using Kafka Streams v0.11, with a Kafka Broker v0.11, but message
>>> protocol set to v0.10.0. Am I missing something?
>>> --
>>> 
>>> Regards,
>>> Anish Samir Mashankar
>>> R&D Engineer
>>> System Insights
>>> +91-9789870733 <+91%2097898%2070733>
>> 
>> --
> 
> Regards,
> Anish Samir Mashankar
> R&D Engineer
> System Insights
> +91-9789870733

Reply via email to