Yi,

I see incoming messages. I see that the counts are getting aggregated as
well.
But when I try to access it, I get null.

- Shekar

On Mon, Jul 6, 2015 at 4:50 PM, Yi Pan <nickpa...@gmail.com> wrote:

> Hi, Shekar,
>
> Did you take a look at the stats to see:
>
> 1) Is there any incoming messages?
> 2) Is there any messages in the changelog topic?
>
> Could you also try to change the log4j level to DEBUG to see whether we can
> see something in the log?
>
> Thanks!
>
> -Yi
>
> On Mon, Jul 6, 2015 at 4:43 PM, Shekar Tippur <ctip...@gmail.com> wrote:
>
> > Martin,
> >
> > As seen below, I have only 1 partition. What else could be wrong?
> >
> > *$ *kafka-topics.sh --describe --zookeeper localhost:2181 --topic parser
> >
> > Topic:parser PartitionCount:1 ReplicationFactor:1 Configs:
> >
> > Topic: parser Partition: 0 Leader: 0 Replicas: 0 Isr: 0
> >
> > - Shekar
> >
> >
> >
> >
> > On Mon, Jul 6, 2015 at 1:29 PM, Martin Kleppmann <mar...@kleppmann.com>
> > wrote:
> >
> > > Hi Shekar,
> > >
> > > The store.all() iterator ought to give you the entire contents of the
> > > store. However, note that each partition of the input topic results in
> a
> > > separate StreamTask instance, which in turn has a separate store. So
> > there
> > > will be as many stores as there are input partitions. Perhaps you're
> not
> > > seeing data appear because you're writing it in one partition, and
> trying
> > > to read it in another.
> > >
> > > The partitioning also answers what happens when you run your job on a
> > > cluster. Different partitions may be processed on different nodes, but
> > all
> > > the messages in one input topic partition always go to the same
> > StreamTask
> > > (and thus the same store). Thus, whether you have skew or not depends
> > > entirely on how you partition your input topic.
> > >
> > > Regarding atomic deletion: each StreamTask is single-threaded, so you
> > > don't have to worry about concurrency. If you want to delete all keys
> in
> > > the store, you can do so.
> > >
> > > Martin
> > >
> > > On 3 Jul 2015, at 17:46, Shekar Tippur <ctip...@gmail.com> wrote:
> > >
> > > > Any answer on how to get all the kv values and reinitialise the kv
> > store?
> > > >
> > > > Had one more question on implementing sliding window.
> > > >
> > > > If i use a kv store like rocksdb, and I use yarn (say 3 node
> cluster),
> > > the
> > > > job that it runs to aggregate gets distributed as well and I am
> > guessing
> > > > the aggregation numbers get skewed? Is that a right assessment?
> > > >
> > > > On Thu, Jul 2, 2015 at 5:47 PM, Shekar Tippur <ctip...@gmail.com>
> > wrote:
> > > >
> > > >> Also, next.getValue() or next.getKey() does not yield anything.
> > > >>
> > > >>   KeyValueIterator<String, String> i = store.all();
> > > >>
> > > >>  while(i.hasNext()){
> > > >>
> > > >>       Entry <String, String> next = i.next();
> > > >>
> > > >>       log.info("Removed Key", next.getValue());
> > > >>
> > > >>  }
> > > >>
> > > >> On Thu, Jul 2, 2015 at 5:36 PM, Shekar Tippur <ctip...@gmail.com>
> > > wrote:
> > > >>
> > > >>> Yi,
> > > >>>
> > > >>> There is no exception. I want to do couple of things in the window.
> > > >>>
> > > >>> - Get all the keys and values and publish to another store (like
> > > >>> graphite) as a list
> > > >>> - Remove all entries.
> > > >>>
> > > >>> I can iterate thro the list later but I want to be able to get all
> kv
> > > >>> values and delete all of them in an atomic operation.
> > > >>>
> > > >>> How do I do these operations on the kv store?
> > > >>>
> > > >>> - S
> > > >>>
> > > >>>
> > > >>>
> > > >>>
> > > >>> On Thu, Jul 2, 2015 at 4:59 PM, Yi Pan <nickpa...@gmail.com>
> wrote:
> > > >>>
> > > >>>> Hi, Shekar,
> > > >>>>
> > > >>>> Sorry I was not able to follow up w/ you in time. It is great that
> > you
> > > >>>> have
> > > >>>> found the configure problem and made it work!
> > > >>>>
> > > >>>> As for the exception on the iterator, could you send us the log w/
> > the
> > > >>>> exception?
> > > >>>>
> > > >>>> Thanks!
> > > >>>>
> > > >>>> -Yi
> > > >>>>
> > > >>>> On Thu, Jul 2, 2015 at 4:36 PM, Shekar Tippur <ctip...@gmail.com>
> > > wrote:
> > > >>>>
> > > >>>>> Yi,
> > > >>>>>
> > > >>>>> Looks like it is working now. There was a redundant line in the
> > > config.
> > > >>>>>
> > > >>>>> I am able to initialize kv store and add values.
> > > >>>>> In the window code, I am unable to retrieve them and mark them as
> > 0.
> > > >>>>>
> > > >>>>> Here is my window code:
> > > >>>>>
> > > >>>>> public void window(MessageCollector collector,
> > > >>>>>
> > > >>>>>      TaskCoordinator coordinator) {
> > > >>>>>
> > > >>>>>  //store.delete(appName);
> > > >>>>>
> > > >>>>>  collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> > > >>>> eventsSeen));
> > > >>>>>
> > > >>>>>  KeyValueIterator<String, String> i = store.all();
> > > >>>>>
> > > >>>>>  while(i.hasNext()){
> > > >>>>>
> > > >>>>>   Entry <String, String> next = i.next();
> > > >>>>>
> > > >>>>>   log.info("Trying to remove Key", next.getKey());
> > > >>>>>
> > > >>>>>   //i.remove();
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>  }
> > > >>>>>
> > > >>>>>  eventsSeen = 0;
> > > >>>>>
> > > >>>>>  i.close();
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>  }
> > > >>>>>
> > > >>>>>
> > > >>>>> How do I retrieve the key and is there a way to remove it?
> i.remove
> > > >>>> throws
> > > >>>>> an exception.
> > > >>>>>
> > > >>>>>
> > > >>>>> - Shekar
> > > >>>>>
> > > >>>>> On Wed, Jul 1, 2015 at 7:25 PM, Shekar Tippur <ctip...@gmail.com
> >
> > > >>>> wrote:
> > > >>>>>
> > > >>>>>> Yi,
> > > >>>>>>
> > > >>>>>> Here is my config file:
> > > >>>>>> http://pastebin.com/Kf3C9E0h
> > > >>>>>>
> > > >>>>>> - S
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>>
> > > >>
> > >
> > >
> >
>

Reply via email to