Yi, I see incoming messages. I see that the counts are getting aggregated as well. But when I try to access it, I get null.
- Shekar On Mon, Jul 6, 2015 at 4:50 PM, Yi Pan <nickpa...@gmail.com> wrote: > Hi, Shekar, > > Did you take a look at the stats to see: > > 1) Is there any incoming messages? > 2) Is there any messages in the changelog topic? > > Could you also try to change the log4j level to DEBUG to see whether we can > see something in the log? > > Thanks! > > -Yi > > On Mon, Jul 6, 2015 at 4:43 PM, Shekar Tippur <ctip...@gmail.com> wrote: > > > Martin, > > > > As seen below, I have only 1 partition. What else could be wrong? > > > > *$ *kafka-topics.sh --describe --zookeeper localhost:2181 --topic parser > > > > Topic:parser PartitionCount:1 ReplicationFactor:1 Configs: > > > > Topic: parser Partition: 0 Leader: 0 Replicas: 0 Isr: 0 > > > > - Shekar > > > > > > > > > > On Mon, Jul 6, 2015 at 1:29 PM, Martin Kleppmann <mar...@kleppmann.com> > > wrote: > > > > > Hi Shekar, > > > > > > The store.all() iterator ought to give you the entire contents of the > > > store. However, note that each partition of the input topic results in > a > > > separate StreamTask instance, which in turn has a separate store. So > > there > > > will be as many stores as there are input partitions. Perhaps you're > not > > > seeing data appear because you're writing it in one partition, and > trying > > > to read it in another. > > > > > > The partitioning also answers what happens when you run your job on a > > > cluster. Different partitions may be processed on different nodes, but > > all > > > the messages in one input topic partition always go to the same > > StreamTask > > > (and thus the same store). Thus, whether you have skew or not depends > > > entirely on how you partition your input topic. > > > > > > Regarding atomic deletion: each StreamTask is single-threaded, so you > > > don't have to worry about concurrency. If you want to delete all keys > in > > > the store, you can do so. > > > > > > Martin > > > > > > On 3 Jul 2015, at 17:46, Shekar Tippur <ctip...@gmail.com> wrote: > > > > > > > Any answer on how to get all the kv values and reinitialise the kv > > store? > > > > > > > > Had one more question on implementing sliding window. > > > > > > > > If i use a kv store like rocksdb, and I use yarn (say 3 node > cluster), > > > the > > > > job that it runs to aggregate gets distributed as well and I am > > guessing > > > > the aggregation numbers get skewed? Is that a right assessment? > > > > > > > > On Thu, Jul 2, 2015 at 5:47 PM, Shekar Tippur <ctip...@gmail.com> > > wrote: > > > > > > > >> Also, next.getValue() or next.getKey() does not yield anything. > > > >> > > > >> KeyValueIterator<String, String> i = store.all(); > > > >> > > > >> while(i.hasNext()){ > > > >> > > > >> Entry <String, String> next = i.next(); > > > >> > > > >> log.info("Removed Key", next.getValue()); > > > >> > > > >> } > > > >> > > > >> On Thu, Jul 2, 2015 at 5:36 PM, Shekar Tippur <ctip...@gmail.com> > > > wrote: > > > >> > > > >>> Yi, > > > >>> > > > >>> There is no exception. I want to do couple of things in the window. > > > >>> > > > >>> - Get all the keys and values and publish to another store (like > > > >>> graphite) as a list > > > >>> - Remove all entries. > > > >>> > > > >>> I can iterate thro the list later but I want to be able to get all > kv > > > >>> values and delete all of them in an atomic operation. > > > >>> > > > >>> How do I do these operations on the kv store? > > > >>> > > > >>> - S > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> On Thu, Jul 2, 2015 at 4:59 PM, Yi Pan <nickpa...@gmail.com> > wrote: > > > >>> > > > >>>> Hi, Shekar, > > > >>>> > > > >>>> Sorry I was not able to follow up w/ you in time. It is great that > > you > > > >>>> have > > > >>>> found the configure problem and made it work! > > > >>>> > > > >>>> As for the exception on the iterator, could you send us the log w/ > > the > > > >>>> exception? > > > >>>> > > > >>>> Thanks! > > > >>>> > > > >>>> -Yi > > > >>>> > > > >>>> On Thu, Jul 2, 2015 at 4:36 PM, Shekar Tippur <ctip...@gmail.com> > > > wrote: > > > >>>> > > > >>>>> Yi, > > > >>>>> > > > >>>>> Looks like it is working now. There was a redundant line in the > > > config. > > > >>>>> > > > >>>>> I am able to initialize kv store and add values. > > > >>>>> In the window code, I am unable to retrieve them and mark them as > > 0. > > > >>>>> > > > >>>>> Here is my window code: > > > >>>>> > > > >>>>> public void window(MessageCollector collector, > > > >>>>> > > > >>>>> TaskCoordinator coordinator) { > > > >>>>> > > > >>>>> //store.delete(appName); > > > >>>>> > > > >>>>> collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, > > > >>>> eventsSeen)); > > > >>>>> > > > >>>>> KeyValueIterator<String, String> i = store.all(); > > > >>>>> > > > >>>>> while(i.hasNext()){ > > > >>>>> > > > >>>>> Entry <String, String> next = i.next(); > > > >>>>> > > > >>>>> log.info("Trying to remove Key", next.getKey()); > > > >>>>> > > > >>>>> //i.remove(); > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> } > > > >>>>> > > > >>>>> eventsSeen = 0; > > > >>>>> > > > >>>>> i.close(); > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> } > > > >>>>> > > > >>>>> > > > >>>>> How do I retrieve the key and is there a way to remove it? > i.remove > > > >>>> throws > > > >>>>> an exception. > > > >>>>> > > > >>>>> > > > >>>>> - Shekar > > > >>>>> > > > >>>>> On Wed, Jul 1, 2015 at 7:25 PM, Shekar Tippur <ctip...@gmail.com > > > > > >>>> wrote: > > > >>>>> > > > >>>>>> Yi, > > > >>>>>> > > > >>>>>> Here is my config file: > > > >>>>>> http://pastebin.com/Kf3C9E0h > > > >>>>>> > > > >>>>>> - S > > > >>>>>> > > > >>>>> > > > >>>> > > > >>> > > > >>> > > > >> > > > > > > > > >