Yi, Interesting;y, I see these in changelog topic (Guessing these are window counts):
kafka-console-consumer.sh --zookeeper localhost:2181 --topic window-changelog SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 497 1436402016563 498 451 382 499 383 427 Code I have: KeyValueIterator<String, String> i = store.all(); while(i.hasNext()){ Entry <String, String> next = i.next(); log.info("Processing Key", next.getKey()); } i.close(); Interestingly I see that there are entries in store.all() as I see a bunch of messages on the console but I am unable to get the key or the value :(. 2015-07-08 17:36:46 WindowTask [INFO] Processing Key 2015-07-08 17:36:46 WindowTask [INFO] Processing Key 2015-07-08 17:36:46 WindowTask [INFO] Processing Key 2015-07-08 17:36:46 WindowTask [INFO] Processing Key 2015-07-08 17:36:46 WindowTask [INFO] Processing Key 2015-07-08 17:36:46 WindowTask [INFO] Processing Key 2015-07-08 17:36:46 WindowTask [INFO] Processing Key 2015-07-08 17:36:46 WindowTask [INFO] Processing Key 2015-07-08 17:36:46 WindowTask [INFO] Processing Key 2015-07-08 17:36:46 WindowTask [INFO] Processing Key 2015-07-08 17:36:46 WindowTask [INFO] Processing Key 2015-07-08 17:36:46 WindowTask [INFO] Processing Key 2015-07-08 17:36:46 WindowTask [INFO] Processing Key 2015-07-08 17:36:46 WindowTask [INFO] Processing Key 2015-07-08 17:36:46 WindowTask [INFO] Processing Key 2015-07-08 17:36:46 WindowTask [INFO] Processing Key On Mon, Jul 6, 2015 at 7:46 PM, Shekar Tippur <ctip...@gmail.com> wrote: > Yi, > > I see incoming messages. I see that the counts are getting aggregated as > well. > But when I try to access it, I get null. > > - Shekar > > On Mon, Jul 6, 2015 at 4:50 PM, Yi Pan <nickpa...@gmail.com> wrote: > >> Hi, Shekar, >> >> Did you take a look at the stats to see: >> >> 1) Is there any incoming messages? >> 2) Is there any messages in the changelog topic? >> >> Could you also try to change the log4j level to DEBUG to see whether we >> can >> see something in the log? >> >> Thanks! >> >> -Yi >> >> On Mon, Jul 6, 2015 at 4:43 PM, Shekar Tippur <ctip...@gmail.com> wrote: >> >> > Martin, >> > >> > As seen below, I have only 1 partition. What else could be wrong? >> > >> > *$ *kafka-topics.sh --describe --zookeeper localhost:2181 --topic parser >> > >> > Topic:parser PartitionCount:1 ReplicationFactor:1 Configs: >> > >> > Topic: parser Partition: 0 Leader: 0 Replicas: 0 Isr: 0 >> > >> > - Shekar >> > >> > >> > >> > >> > On Mon, Jul 6, 2015 at 1:29 PM, Martin Kleppmann <mar...@kleppmann.com> >> > wrote: >> > >> > > Hi Shekar, >> > > >> > > The store.all() iterator ought to give you the entire contents of the >> > > store. However, note that each partition of the input topic results >> in a >> > > separate StreamTask instance, which in turn has a separate store. So >> > there >> > > will be as many stores as there are input partitions. Perhaps you're >> not >> > > seeing data appear because you're writing it in one partition, and >> trying >> > > to read it in another. >> > > >> > > The partitioning also answers what happens when you run your job on a >> > > cluster. Different partitions may be processed on different nodes, but >> > all >> > > the messages in one input topic partition always go to the same >> > StreamTask >> > > (and thus the same store). Thus, whether you have skew or not depends >> > > entirely on how you partition your input topic. >> > > >> > > Regarding atomic deletion: each StreamTask is single-threaded, so you >> > > don't have to worry about concurrency. If you want to delete all keys >> in >> > > the store, you can do so. >> > > >> > > Martin >> > > >> > > On 3 Jul 2015, at 17:46, Shekar Tippur <ctip...@gmail.com> wrote: >> > > >> > > > Any answer on how to get all the kv values and reinitialise the kv >> > store? >> > > > >> > > > Had one more question on implementing sliding window. >> > > > >> > > > If i use a kv store like rocksdb, and I use yarn (say 3 node >> cluster), >> > > the >> > > > job that it runs to aggregate gets distributed as well and I am >> > guessing >> > > > the aggregation numbers get skewed? Is that a right assessment? >> > > > >> > > > On Thu, Jul 2, 2015 at 5:47 PM, Shekar Tippur <ctip...@gmail.com> >> > wrote: >> > > > >> > > >> Also, next.getValue() or next.getKey() does not yield anything. >> > > >> >> > > >> KeyValueIterator<String, String> i = store.all(); >> > > >> >> > > >> while(i.hasNext()){ >> > > >> >> > > >> Entry <String, String> next = i.next(); >> > > >> >> > > >> log.info("Removed Key", next.getValue()); >> > > >> >> > > >> } >> > > >> >> > > >> On Thu, Jul 2, 2015 at 5:36 PM, Shekar Tippur <ctip...@gmail.com> >> > > wrote: >> > > >> >> > > >>> Yi, >> > > >>> >> > > >>> There is no exception. I want to do couple of things in the >> window. >> > > >>> >> > > >>> - Get all the keys and values and publish to another store (like >> > > >>> graphite) as a list >> > > >>> - Remove all entries. >> > > >>> >> > > >>> I can iterate thro the list later but I want to be able to get >> all kv >> > > >>> values and delete all of them in an atomic operation. >> > > >>> >> > > >>> How do I do these operations on the kv store? >> > > >>> >> > > >>> - S >> > > >>> >> > > >>> >> > > >>> >> > > >>> >> > > >>> On Thu, Jul 2, 2015 at 4:59 PM, Yi Pan <nickpa...@gmail.com> >> wrote: >> > > >>> >> > > >>>> Hi, Shekar, >> > > >>>> >> > > >>>> Sorry I was not able to follow up w/ you in time. It is great >> that >> > you >> > > >>>> have >> > > >>>> found the configure problem and made it work! >> > > >>>> >> > > >>>> As for the exception on the iterator, could you send us the log >> w/ >> > the >> > > >>>> exception? >> > > >>>> >> > > >>>> Thanks! >> > > >>>> >> > > >>>> -Yi >> > > >>>> >> > > >>>> On Thu, Jul 2, 2015 at 4:36 PM, Shekar Tippur <ctip...@gmail.com >> > >> > > wrote: >> > > >>>> >> > > >>>>> Yi, >> > > >>>>> >> > > >>>>> Looks like it is working now. There was a redundant line in the >> > > config. >> > > >>>>> >> > > >>>>> I am able to initialize kv store and add values. >> > > >>>>> In the window code, I am unable to retrieve them and mark them >> as >> > 0. >> > > >>>>> >> > > >>>>> Here is my window code: >> > > >>>>> >> > > >>>>> public void window(MessageCollector collector, >> > > >>>>> >> > > >>>>> TaskCoordinator coordinator) { >> > > >>>>> >> > > >>>>> //store.delete(appName); >> > > >>>>> >> > > >>>>> collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, >> > > >>>> eventsSeen)); >> > > >>>>> >> > > >>>>> KeyValueIterator<String, String> i = store.all(); >> > > >>>>> >> > > >>>>> while(i.hasNext()){ >> > > >>>>> >> > > >>>>> Entry <String, String> next = i.next(); >> > > >>>>> >> > > >>>>> log.info("Trying to remove Key", next.getKey()); >> > > >>>>> >> > > >>>>> //i.remove(); >> > > >>>>> >> > > >>>>> >> > > >>>>> >> > > >>>>> } >> > > >>>>> >> > > >>>>> eventsSeen = 0; >> > > >>>>> >> > > >>>>> i.close(); >> > > >>>>> >> > > >>>>> >> > > >>>>> >> > > >>>>> } >> > > >>>>> >> > > >>>>> >> > > >>>>> How do I retrieve the key and is there a way to remove it? >> i.remove >> > > >>>> throws >> > > >>>>> an exception. >> > > >>>>> >> > > >>>>> >> > > >>>>> - Shekar >> > > >>>>> >> > > >>>>> On Wed, Jul 1, 2015 at 7:25 PM, Shekar Tippur < >> ctip...@gmail.com> >> > > >>>> wrote: >> > > >>>>> >> > > >>>>>> Yi, >> > > >>>>>> >> > > >>>>>> Here is my config file: >> > > >>>>>> http://pastebin.com/Kf3C9E0h >> > > >>>>>> >> > > >>>>>> - S >> > > >>>>>> >> > > >>>>> >> > > >>>> >> > > >>> >> > > >>> >> > > >> >> > > >> > > >> > >> > >