Martin, As seen below, I have only 1 partition. What else could be wrong?
*$ *kafka-topics.sh --describe --zookeeper localhost:2181 --topic parser Topic:parser PartitionCount:1 ReplicationFactor:1 Configs: Topic: parser Partition: 0 Leader: 0 Replicas: 0 Isr: 0 - Shekar On Mon, Jul 6, 2015 at 1:29 PM, Martin Kleppmann <mar...@kleppmann.com> wrote: > Hi Shekar, > > The store.all() iterator ought to give you the entire contents of the > store. However, note that each partition of the input topic results in a > separate StreamTask instance, which in turn has a separate store. So there > will be as many stores as there are input partitions. Perhaps you're not > seeing data appear because you're writing it in one partition, and trying > to read it in another. > > The partitioning also answers what happens when you run your job on a > cluster. Different partitions may be processed on different nodes, but all > the messages in one input topic partition always go to the same StreamTask > (and thus the same store). Thus, whether you have skew or not depends > entirely on how you partition your input topic. > > Regarding atomic deletion: each StreamTask is single-threaded, so you > don't have to worry about concurrency. If you want to delete all keys in > the store, you can do so. > > Martin > > On 3 Jul 2015, at 17:46, Shekar Tippur <ctip...@gmail.com> wrote: > > > Any answer on how to get all the kv values and reinitialise the kv store? > > > > Had one more question on implementing sliding window. > > > > If i use a kv store like rocksdb, and I use yarn (say 3 node cluster), > the > > job that it runs to aggregate gets distributed as well and I am guessing > > the aggregation numbers get skewed? Is that a right assessment? > > > > On Thu, Jul 2, 2015 at 5:47 PM, Shekar Tippur <ctip...@gmail.com> wrote: > > > >> Also, next.getValue() or next.getKey() does not yield anything. > >> > >> KeyValueIterator<String, String> i = store.all(); > >> > >> while(i.hasNext()){ > >> > >> Entry <String, String> next = i.next(); > >> > >> log.info("Removed Key", next.getValue()); > >> > >> } > >> > >> On Thu, Jul 2, 2015 at 5:36 PM, Shekar Tippur <ctip...@gmail.com> > wrote: > >> > >>> Yi, > >>> > >>> There is no exception. I want to do couple of things in the window. > >>> > >>> - Get all the keys and values and publish to another store (like > >>> graphite) as a list > >>> - Remove all entries. > >>> > >>> I can iterate thro the list later but I want to be able to get all kv > >>> values and delete all of them in an atomic operation. > >>> > >>> How do I do these operations on the kv store? > >>> > >>> - S > >>> > >>> > >>> > >>> > >>> On Thu, Jul 2, 2015 at 4:59 PM, Yi Pan <nickpa...@gmail.com> wrote: > >>> > >>>> Hi, Shekar, > >>>> > >>>> Sorry I was not able to follow up w/ you in time. It is great that you > >>>> have > >>>> found the configure problem and made it work! > >>>> > >>>> As for the exception on the iterator, could you send us the log w/ the > >>>> exception? > >>>> > >>>> Thanks! > >>>> > >>>> -Yi > >>>> > >>>> On Thu, Jul 2, 2015 at 4:36 PM, Shekar Tippur <ctip...@gmail.com> > wrote: > >>>> > >>>>> Yi, > >>>>> > >>>>> Looks like it is working now. There was a redundant line in the > config. > >>>>> > >>>>> I am able to initialize kv store and add values. > >>>>> In the window code, I am unable to retrieve them and mark them as 0. > >>>>> > >>>>> Here is my window code: > >>>>> > >>>>> public void window(MessageCollector collector, > >>>>> > >>>>> TaskCoordinator coordinator) { > >>>>> > >>>>> //store.delete(appName); > >>>>> > >>>>> collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, > >>>> eventsSeen)); > >>>>> > >>>>> KeyValueIterator<String, String> i = store.all(); > >>>>> > >>>>> while(i.hasNext()){ > >>>>> > >>>>> Entry <String, String> next = i.next(); > >>>>> > >>>>> log.info("Trying to remove Key", next.getKey()); > >>>>> > >>>>> //i.remove(); > >>>>> > >>>>> > >>>>> > >>>>> } > >>>>> > >>>>> eventsSeen = 0; > >>>>> > >>>>> i.close(); > >>>>> > >>>>> > >>>>> > >>>>> } > >>>>> > >>>>> > >>>>> How do I retrieve the key and is there a way to remove it? i.remove > >>>> throws > >>>>> an exception. > >>>>> > >>>>> > >>>>> - Shekar > >>>>> > >>>>> On Wed, Jul 1, 2015 at 7:25 PM, Shekar Tippur <ctip...@gmail.com> > >>>> wrote: > >>>>> > >>>>>> Yi, > >>>>>> > >>>>>> Here is my config file: > >>>>>> http://pastebin.com/Kf3C9E0h > >>>>>> > >>>>>> - S > >>>>>> > >>>>> > >>>> > >>> > >>> > >> > >