Hi Jon,
The store you have created is a window store, so you need to use:

kafkaStreams.store("AggStore",* QueryableStoreTypes.windowStore()*)

Thanks,
Damian

On Sun, 26 Mar 2017 at 14:14, Jon Yeargers <jon.yearg...@cedexis.com> wrote:

Also - if I run this on two hosts - what does it imply if the response to
'streams.allMetadata()' from one host includes both instances but the other
host only knows about itself?

On Sun, Mar 26, 2017 at 5:58 AM, Jon Yeargers <jon.yearg...@cedexis.com>
wrote:

> If the '.state()' function returns "RUNNING" and I still get this
> exception?
>
> On Fri, Mar 24, 2017 at 1:56 PM, Eno Thereska <eno.there...@gmail.com>
> wrote:
>
>> Hi Jon,
>>
>> This is expected, see this: https://groups.google.com/foru
>> m/?pli=1#!searchin/confluent-platform/migrated$20to$
>> 20another$20instance%7Csort:relevance/confluent-platform/
>> LglWC_dZDKw/qsPuCRT_DQAJ <https://groups.google.com/for
>> um/?pli=1#!searchin/confluent-platform/migrated$20to$
>> 20another$20instance|sort:relevance/confluent-platform/
>> LglWC_dZDKw/qsPuCRT_DQAJ>.
>>
>> Thanks
>> Eno
>> > On 24 Mar 2017, at 20:51, Jon Yeargers <jon.yearg...@cedexis.com>
>> wrote:
>> >
>> > I've setup a KTable as follows:
>> >
>> > KTable<Windowed<String>, String> outTable = sourceStream.groupByKey().
>> > reduce(rowReducer,
>> >                TimeWindows.of(5 * 60 * 1000L).advanceBy(1 * 60 *
>> > 1000).until(10 * 60 * 1000L),
>> >                "AggStore");
>> >
>> > I can confirm its presence via 'streams.allMetadata()' (accessible
>> through
>> > a simple httpserver).
>> >
>> > When I call 'ReadOnlyKeyValueStore<String, String> store =
>> > kafkaStreams.store("AggStore", QueryableStoreTypes.keyValueStore());'
>> >
>> > I get this exception:
>> >
>> > org.apache.kafka.streams.errors.InvalidStateStoreException: the state
>> > store, AggStore, may have migrated to another instance.
>> >        at
>> > org.apache.kafka.streams.state.internals.QueryableStoreProvi
>> der.getStore(QueryableStoreProvider.java:49)
>> >        at
>> > org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:378)
>> >        at
>> > com.cedexis.videokafka.videohouraggregator.RequestHandler.
>> handle(RequestHandler.java:97)
>> >        at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
>> >        at sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:83)
>> >        at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:82)
>> >        at
>> > sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(Se
>> rverImpl.java:675)
>> >        at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
>> >        at sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:6
>> 47)
>> >        at
>> > sun.net.httpserver.ServerImpl$DefaultExecutor.execute(Server
>> Impl.java:158)
>> >        at
>> > sun.net.httpserver.ServerImpl$Dispatcher.handle(ServerImpl.java:431)
>> >        at sun.net.httpserver.ServerImpl$Dispatcher.run(ServerImpl.java
>> :396)
>> >        at java.lang.Thread.run(Thread.java:745)
>> >
>> >
>> > ... except.. there is only one instance.. running locally.
>>
>>
>

Reply via email to