Hi Jon, The store you have created is a window store, so you need to use: kafkaStreams.store("AggStore",* QueryableStoreTypes.windowStore()*)
Thanks, Damian On Sun, 26 Mar 2017 at 14:14, Jon Yeargers <jon.yearg...@cedexis.com> wrote: Also - if I run this on two hosts - what does it imply if the response to 'streams.allMetadata()' from one host includes both instances but the other host only knows about itself? On Sun, Mar 26, 2017 at 5:58 AM, Jon Yeargers <jon.yearg...@cedexis.com> wrote: > If the '.state()' function returns "RUNNING" and I still get this > exception? > > On Fri, Mar 24, 2017 at 1:56 PM, Eno Thereska <eno.there...@gmail.com> > wrote: > >> Hi Jon, >> >> This is expected, see this: https://groups.google.com/foru >> m/?pli=1#!searchin/confluent-platform/migrated$20to$ >> 20another$20instance%7Csort:relevance/confluent-platform/ >> LglWC_dZDKw/qsPuCRT_DQAJ <https://groups.google.com/for >> um/?pli=1#!searchin/confluent-platform/migrated$20to$ >> 20another$20instance|sort:relevance/confluent-platform/ >> LglWC_dZDKw/qsPuCRT_DQAJ>. >> >> Thanks >> Eno >> > On 24 Mar 2017, at 20:51, Jon Yeargers <jon.yearg...@cedexis.com> >> wrote: >> > >> > I've setup a KTable as follows: >> > >> > KTable<Windowed<String>, String> outTable = sourceStream.groupByKey(). >> > reduce(rowReducer, >> > TimeWindows.of(5 * 60 * 1000L).advanceBy(1 * 60 * >> > 1000).until(10 * 60 * 1000L), >> > "AggStore"); >> > >> > I can confirm its presence via 'streams.allMetadata()' (accessible >> through >> > a simple httpserver). >> > >> > When I call 'ReadOnlyKeyValueStore<String, String> store = >> > kafkaStreams.store("AggStore", QueryableStoreTypes.keyValueStore());' >> > >> > I get this exception: >> > >> > org.apache.kafka.streams.errors.InvalidStateStoreException: the state >> > store, AggStore, may have migrated to another instance. >> > at >> > org.apache.kafka.streams.state.internals.QueryableStoreProvi >> der.getStore(QueryableStoreProvider.java:49) >> > at >> > org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:378) >> > at >> > com.cedexis.videokafka.videohouraggregator.RequestHandler. >> handle(RequestHandler.java:97) >> > at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79) >> > at sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:83) >> > at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:82) >> > at >> > sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(Se >> rverImpl.java:675) >> > at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79) >> > at sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:6 >> 47) >> > at >> > sun.net.httpserver.ServerImpl$DefaultExecutor.execute(Server >> Impl.java:158) >> > at >> > sun.net.httpserver.ServerImpl$Dispatcher.handle(ServerImpl.java:431) >> > at sun.net.httpserver.ServerImpl$Dispatcher.run(ServerImpl.java >> :396) >> > at java.lang.Thread.run(Thread.java:745) >> > >> > >> > ... except.. there is only one instance.. running locally. >> >> >