Hi Adam,

If you increase the number of partitions in the topic "topic1" after the
state store is created, you'd need to manually increase the number of
partitions in the "app1-store1-changelog" topic as well.  Or remove the
topic and let KS recreate it next run.  But, either way, hopefully you
don't need the data in it, 'cause it won't match the partitioning of the
input topic. :-)

Mathieu


On Mon, Feb 13, 2017 at 11:59 AM, Adam Warski <a...@warski.org> wrote:

> Hello,
>
> I have a simple example (or so it would seem) of a stream processor which
> uses a persistent state store. Testing on one local Kafka (0.10.1.1) node,
> this starts up without problems for a topic with 1 partition. However, if I
> create a topic with 3 partitions I’m getting the following exception
> shortly after the init() method of the Processor is called (init completes
> without problems):
>
> 2017-02-13 18:41:18 ERROR StreamThread:666 - stream-thread
> [StreamThread-1] Failed to create an active task 0_1:
> org.apache.kafka.streams.errors.StreamsException: task [0_1] Store
> store1's change log (app1-store1-changelog) does not contain partition 1
>         at org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.register(ProcessorStateManager.java:185)
>         at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.register(ProcessorContextImpl.java:123)
>         at org.apache.kafka.streams.state.internals.RocksDBStore.
> init(RocksDBStore.java:169)
>         at org.apache.kafka.streams.state.internals.
> MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
>         at org.apache.kafka.streams.processor.internals.AbstractTask.
> initializeStateStores(AbstractTask.java:81)
>         at org.apache.kafka.streams.processor.internals.
> StreamTask.<init>(StreamTask.java:119)
>
> The code is essentially:
>
> StateStoreSupplier testStore = Stores.create("store1")
>         .withStringKeys()
>         .withStringValues()
>         .persistent()
>         .build();
>
> TopologyBuilder builder = new TopologyBuilder();
>
> builder.addSource("source", "topic1")
>         .addProcessor("process", TestProcessor::new, "source")
>         .addStateStore(testStore, "process”);
>
> KafkaStreams streams = new KafkaStreams(builder, props);
> streams.start();
>
> public static class TestProcessor implements Processor<String, String> {
>     @Override
>     public void init(ProcessorContext context) {
>         context.getStateStore("store1");
>         System.out.println("Initialized");
>     }
> }
>
> Full source here: https://gist.github.com/adamw/
> b5c69f86d8688da23afebd095683faaa <https://gist.github.com/>
> Full stack trace: https://gist.github.com/adamw/
> f72cdf0c2f0d67425ed9c103a327f3bf <https://gist.github.com/adamw/
> f72cdf0c2f0d67425ed9c103a327f3bf>
>
> I would be grateful for any pointers!
> Adam
>
> --
> Adam Warski
>
> http://twitter.com/#!/adamwarski <http://twitter.com/#!/adamwarski>
> http://www.softwaremill.com <http://www.softwaremill.com/>
> http://www.warski.org <http://www.warski.org/>
>

Reply via email to