Martin,

As seen below, I have only 1 partition. What else could be wrong?

*$ *kafka-topics.sh --describe --zookeeper localhost:2181 --topic parser

Topic:parser PartitionCount:1 ReplicationFactor:1 Configs:

Topic: parser Partition: 0 Leader: 0 Replicas: 0 Isr: 0

- Shekar




On Mon, Jul 6, 2015 at 1:29 PM, Martin Kleppmann <mar...@kleppmann.com>
wrote:

> Hi Shekar,
>
> The store.all() iterator ought to give you the entire contents of the
> store. However, note that each partition of the input topic results in a
> separate StreamTask instance, which in turn has a separate store. So there
> will be as many stores as there are input partitions. Perhaps you're not
> seeing data appear because you're writing it in one partition, and trying
> to read it in another.
>
> The partitioning also answers what happens when you run your job on a
> cluster. Different partitions may be processed on different nodes, but all
> the messages in one input topic partition always go to the same StreamTask
> (and thus the same store). Thus, whether you have skew or not depends
> entirely on how you partition your input topic.
>
> Regarding atomic deletion: each StreamTask is single-threaded, so you
> don't have to worry about concurrency. If you want to delete all keys in
> the store, you can do so.
>
> Martin
>
> On 3 Jul 2015, at 17:46, Shekar Tippur <ctip...@gmail.com> wrote:
>
> > Any answer on how to get all the kv values and reinitialise the kv store?
> >
> > Had one more question on implementing sliding window.
> >
> > If i use a kv store like rocksdb, and I use yarn (say 3 node cluster),
> the
> > job that it runs to aggregate gets distributed as well and I am guessing
> > the aggregation numbers get skewed? Is that a right assessment?
> >
> > On Thu, Jul 2, 2015 at 5:47 PM, Shekar Tippur <ctip...@gmail.com> wrote:
> >
> >> Also, next.getValue() or next.getKey() does not yield anything.
> >>
> >>   KeyValueIterator<String, String> i = store.all();
> >>
> >>  while(i.hasNext()){
> >>
> >>       Entry <String, String> next = i.next();
> >>
> >>       log.info("Removed Key", next.getValue());
> >>
> >>  }
> >>
> >> On Thu, Jul 2, 2015 at 5:36 PM, Shekar Tippur <ctip...@gmail.com>
> wrote:
> >>
> >>> Yi,
> >>>
> >>> There is no exception. I want to do couple of things in the window.
> >>>
> >>> - Get all the keys and values and publish to another store (like
> >>> graphite) as a list
> >>> - Remove all entries.
> >>>
> >>> I can iterate thro the list later but I want to be able to get all kv
> >>> values and delete all of them in an atomic operation.
> >>>
> >>> How do I do these operations on the kv store?
> >>>
> >>> - S
> >>>
> >>>
> >>>
> >>>
> >>> On Thu, Jul 2, 2015 at 4:59 PM, Yi Pan <nickpa...@gmail.com> wrote:
> >>>
> >>>> Hi, Shekar,
> >>>>
> >>>> Sorry I was not able to follow up w/ you in time. It is great that you
> >>>> have
> >>>> found the configure problem and made it work!
> >>>>
> >>>> As for the exception on the iterator, could you send us the log w/ the
> >>>> exception?
> >>>>
> >>>> Thanks!
> >>>>
> >>>> -Yi
> >>>>
> >>>> On Thu, Jul 2, 2015 at 4:36 PM, Shekar Tippur <ctip...@gmail.com>
> wrote:
> >>>>
> >>>>> Yi,
> >>>>>
> >>>>> Looks like it is working now. There was a redundant line in the
> config.
> >>>>>
> >>>>> I am able to initialize kv store and add values.
> >>>>> In the window code, I am unable to retrieve them and mark them as 0.
> >>>>>
> >>>>> Here is my window code:
> >>>>>
> >>>>> public void window(MessageCollector collector,
> >>>>>
> >>>>>      TaskCoordinator coordinator) {
> >>>>>
> >>>>>  //store.delete(appName);
> >>>>>
> >>>>>  collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> >>>> eventsSeen));
> >>>>>
> >>>>>  KeyValueIterator<String, String> i = store.all();
> >>>>>
> >>>>>  while(i.hasNext()){
> >>>>>
> >>>>>   Entry <String, String> next = i.next();
> >>>>>
> >>>>>   log.info("Trying to remove Key", next.getKey());
> >>>>>
> >>>>>   //i.remove();
> >>>>>
> >>>>>
> >>>>>
> >>>>>  }
> >>>>>
> >>>>>  eventsSeen = 0;
> >>>>>
> >>>>>  i.close();
> >>>>>
> >>>>>
> >>>>>
> >>>>>  }
> >>>>>
> >>>>>
> >>>>> How do I retrieve the key and is there a way to remove it? i.remove
> >>>> throws
> >>>>> an exception.
> >>>>>
> >>>>>
> >>>>> - Shekar
> >>>>>
> >>>>> On Wed, Jul 1, 2015 at 7:25 PM, Shekar Tippur <ctip...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>>> Yi,
> >>>>>>
> >>>>>> Here is my config file:
> >>>>>> http://pastebin.com/Kf3C9E0h
> >>>>>>
> >>>>>> - S
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>
>
>

Reply via email to