Yi,

Interesting;y, I see these in changelog topic (Guessing these are window
counts):

 kafka-console-consumer.sh --zookeeper localhost:2181 --topic
window-changelog

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) logger implementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.

497

1436402016563

498

451

382

499

383

427


Code I have:


  KeyValueIterator<String, String> i = store.all();

  while(i.hasNext()){

   Entry <String, String> next = i.next();

   log.info("Processing Key", next.getKey());

  }

   i.close();


Interestingly I see that there are entries in store.all() as I see a bunch
of messages on the console but I am unable to get the key or the value :(.

2015-07-08 17:36:46 WindowTask [INFO] Processing Key

2015-07-08 17:36:46 WindowTask [INFO] Processing Key

2015-07-08 17:36:46 WindowTask [INFO] Processing Key

2015-07-08 17:36:46 WindowTask [INFO] Processing Key

2015-07-08 17:36:46 WindowTask [INFO] Processing Key

2015-07-08 17:36:46 WindowTask [INFO] Processing Key

2015-07-08 17:36:46 WindowTask [INFO] Processing Key

2015-07-08 17:36:46 WindowTask [INFO] Processing Key

2015-07-08 17:36:46 WindowTask [INFO] Processing Key

2015-07-08 17:36:46 WindowTask [INFO] Processing Key

2015-07-08 17:36:46 WindowTask [INFO] Processing Key

2015-07-08 17:36:46 WindowTask [INFO] Processing Key

2015-07-08 17:36:46 WindowTask [INFO] Processing Key

2015-07-08 17:36:46 WindowTask [INFO] Processing Key

2015-07-08 17:36:46 WindowTask [INFO] Processing Key

2015-07-08 17:36:46 WindowTask [INFO] Processing Key




On Mon, Jul 6, 2015 at 7:46 PM, Shekar Tippur <ctip...@gmail.com> wrote:

> Yi,
>
> I see incoming messages. I see that the counts are getting aggregated as
> well.
> But when I try to access it, I get null.
>
> - Shekar
>
> On Mon, Jul 6, 2015 at 4:50 PM, Yi Pan <nickpa...@gmail.com> wrote:
>
>> Hi, Shekar,
>>
>> Did you take a look at the stats to see:
>>
>> 1) Is there any incoming messages?
>> 2) Is there any messages in the changelog topic?
>>
>> Could you also try to change the log4j level to DEBUG to see whether we
>> can
>> see something in the log?
>>
>> Thanks!
>>
>> -Yi
>>
>> On Mon, Jul 6, 2015 at 4:43 PM, Shekar Tippur <ctip...@gmail.com> wrote:
>>
>> > Martin,
>> >
>> > As seen below, I have only 1 partition. What else could be wrong?
>> >
>> > *$ *kafka-topics.sh --describe --zookeeper localhost:2181 --topic parser
>> >
>> > Topic:parser PartitionCount:1 ReplicationFactor:1 Configs:
>> >
>> > Topic: parser Partition: 0 Leader: 0 Replicas: 0 Isr: 0
>> >
>> > - Shekar
>> >
>> >
>> >
>> >
>> > On Mon, Jul 6, 2015 at 1:29 PM, Martin Kleppmann <mar...@kleppmann.com>
>> > wrote:
>> >
>> > > Hi Shekar,
>> > >
>> > > The store.all() iterator ought to give you the entire contents of the
>> > > store. However, note that each partition of the input topic results
>> in a
>> > > separate StreamTask instance, which in turn has a separate store. So
>> > there
>> > > will be as many stores as there are input partitions. Perhaps you're
>> not
>> > > seeing data appear because you're writing it in one partition, and
>> trying
>> > > to read it in another.
>> > >
>> > > The partitioning also answers what happens when you run your job on a
>> > > cluster. Different partitions may be processed on different nodes, but
>> > all
>> > > the messages in one input topic partition always go to the same
>> > StreamTask
>> > > (and thus the same store). Thus, whether you have skew or not depends
>> > > entirely on how you partition your input topic.
>> > >
>> > > Regarding atomic deletion: each StreamTask is single-threaded, so you
>> > > don't have to worry about concurrency. If you want to delete all keys
>> in
>> > > the store, you can do so.
>> > >
>> > > Martin
>> > >
>> > > On 3 Jul 2015, at 17:46, Shekar Tippur <ctip...@gmail.com> wrote:
>> > >
>> > > > Any answer on how to get all the kv values and reinitialise the kv
>> > store?
>> > > >
>> > > > Had one more question on implementing sliding window.
>> > > >
>> > > > If i use a kv store like rocksdb, and I use yarn (say 3 node
>> cluster),
>> > > the
>> > > > job that it runs to aggregate gets distributed as well and I am
>> > guessing
>> > > > the aggregation numbers get skewed? Is that a right assessment?
>> > > >
>> > > > On Thu, Jul 2, 2015 at 5:47 PM, Shekar Tippur <ctip...@gmail.com>
>> > wrote:
>> > > >
>> > > >> Also, next.getValue() or next.getKey() does not yield anything.
>> > > >>
>> > > >>   KeyValueIterator<String, String> i = store.all();
>> > > >>
>> > > >>  while(i.hasNext()){
>> > > >>
>> > > >>       Entry <String, String> next = i.next();
>> > > >>
>> > > >>       log.info("Removed Key", next.getValue());
>> > > >>
>> > > >>  }
>> > > >>
>> > > >> On Thu, Jul 2, 2015 at 5:36 PM, Shekar Tippur <ctip...@gmail.com>
>> > > wrote:
>> > > >>
>> > > >>> Yi,
>> > > >>>
>> > > >>> There is no exception. I want to do couple of things in the
>> window.
>> > > >>>
>> > > >>> - Get all the keys and values and publish to another store (like
>> > > >>> graphite) as a list
>> > > >>> - Remove all entries.
>> > > >>>
>> > > >>> I can iterate thro the list later but I want to be able to get
>> all kv
>> > > >>> values and delete all of them in an atomic operation.
>> > > >>>
>> > > >>> How do I do these operations on the kv store?
>> > > >>>
>> > > >>> - S
>> > > >>>
>> > > >>>
>> > > >>>
>> > > >>>
>> > > >>> On Thu, Jul 2, 2015 at 4:59 PM, Yi Pan <nickpa...@gmail.com>
>> wrote:
>> > > >>>
>> > > >>>> Hi, Shekar,
>> > > >>>>
>> > > >>>> Sorry I was not able to follow up w/ you in time. It is great
>> that
>> > you
>> > > >>>> have
>> > > >>>> found the configure problem and made it work!
>> > > >>>>
>> > > >>>> As for the exception on the iterator, could you send us the log
>> w/
>> > the
>> > > >>>> exception?
>> > > >>>>
>> > > >>>> Thanks!
>> > > >>>>
>> > > >>>> -Yi
>> > > >>>>
>> > > >>>> On Thu, Jul 2, 2015 at 4:36 PM, Shekar Tippur <ctip...@gmail.com
>> >
>> > > wrote:
>> > > >>>>
>> > > >>>>> Yi,
>> > > >>>>>
>> > > >>>>> Looks like it is working now. There was a redundant line in the
>> > > config.
>> > > >>>>>
>> > > >>>>> I am able to initialize kv store and add values.
>> > > >>>>> In the window code, I am unable to retrieve them and mark them
>> as
>> > 0.
>> > > >>>>>
>> > > >>>>> Here is my window code:
>> > > >>>>>
>> > > >>>>> public void window(MessageCollector collector,
>> > > >>>>>
>> > > >>>>>      TaskCoordinator coordinator) {
>> > > >>>>>
>> > > >>>>>  //store.delete(appName);
>> > > >>>>>
>> > > >>>>>  collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>> > > >>>> eventsSeen));
>> > > >>>>>
>> > > >>>>>  KeyValueIterator<String, String> i = store.all();
>> > > >>>>>
>> > > >>>>>  while(i.hasNext()){
>> > > >>>>>
>> > > >>>>>   Entry <String, String> next = i.next();
>> > > >>>>>
>> > > >>>>>   log.info("Trying to remove Key", next.getKey());
>> > > >>>>>
>> > > >>>>>   //i.remove();
>> > > >>>>>
>> > > >>>>>
>> > > >>>>>
>> > > >>>>>  }
>> > > >>>>>
>> > > >>>>>  eventsSeen = 0;
>> > > >>>>>
>> > > >>>>>  i.close();
>> > > >>>>>
>> > > >>>>>
>> > > >>>>>
>> > > >>>>>  }
>> > > >>>>>
>> > > >>>>>
>> > > >>>>> How do I retrieve the key and is there a way to remove it?
>> i.remove
>> > > >>>> throws
>> > > >>>>> an exception.
>> > > >>>>>
>> > > >>>>>
>> > > >>>>> - Shekar
>> > > >>>>>
>> > > >>>>> On Wed, Jul 1, 2015 at 7:25 PM, Shekar Tippur <
>> ctip...@gmail.com>
>> > > >>>> wrote:
>> > > >>>>>
>> > > >>>>>> Yi,
>> > > >>>>>>
>> > > >>>>>> Here is my config file:
>> > > >>>>>> http://pastebin.com/Kf3C9E0h
>> > > >>>>>>
>> > > >>>>>> - S
>> > > >>>>>>
>> > > >>>>>
>> > > >>>>
>> > > >>>
>> > > >>>
>> > > >>
>> > >
>> > >
>> >
>>
>
>

Reply via email to