Re: Kafka windowed table not aggregating correctly

2016-12-13 Thread Matthias J. Sax
Just increase the retention time so the window is not dropped and can
accept later arriving data.

About your example: retention time specified via until() is a minimum
retention time! It can happen, that a window is kept longer.


-Matthias

On 12/12/16 11:49 PM, Sachin Mittal wrote:
> Hi,
> Well it does help in case you mentioned, but in the case when on 2017 Dec
> 12 12:01 AM if we receive a message stamped 2017 Dec 11 11:59 PM, it will
> either drop this message or create a fresh older window and aggregate the
> message in that, and then drop the window.
> It is not clear which of the case it will do. But here both cases are
> wrong, as ideally it should have aggregated that message into previous
> aggregation and not start a fresh older aggregation (since on Dec 12 12:00
> AM, we drop older windows and create fresh ones.)
> 
> Could you please explain this case.
> 
> I am trying to reproduce this scenario and have written a small java
> program which runs against latest kafka source. Build against trunk git
> commit of 01d58ad8e039181ade742cf896a08199e3cb7483
> 
> Here I am publishing messages with ts
> TS, TS + 5,  TS + 1, TS + 6, TS + 2, TS + 7, TS + 3, TS + 8, TS + 4, + TS +
> 9, TS + 5 ...
> I hope you get an idea where TS is generally increasing but a next TS can
> have value less than previous one.
> 
> My window is
> TimeWindows.of(60 * 1000L).advanceBy(30 * 1000L).until(2 * 60 * 1000L)
> ie 1 min rolling by 30 seconds and until 2 minutes when we discard the old
> and create new one.
> 
> What I observe is that it always aggregate the result in first bucket it
> creates even after until timestamp is elapsed. So kind of confused here.
> 
> See if you can give me some insight into rolling window. Here is the code
> attached.
> 
> 
> Thanks
> Sachin
> --
> 
> import java.io.ByteArrayOutputStream;
> import java.util.Date;
> import java.util.Map;
> import java.util.Properties;
> import java.util.SortedSet;
> import java.util.TreeSet;
> 
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import org.apache.kafka.clients.consumer.ConsumerRecord;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.errors.SerializationException;
> import org.apache.kafka.common.serialization.Deserializer;
> import org.apache.kafka.common.serialization.Serde;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.common.serialization.Serializer;
> import org.apache.kafka.streams.KafkaStreams;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.Aggregator;
> import org.apache.kafka.streams.kstream.ForeachAction;
> import org.apache.kafka.streams.kstream.Initializer;
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.TimeWindows;
> import org.apache.kafka.streams.kstream.Windowed;
> import org.apache.kafka.streams.processor.TimestampExtractor;
> 
> import com.fasterxml.jackson.core.type.TypeReference;
> import com.fasterxml.jackson.databind.ObjectMapper;
> 
> public class TestKafkaWindowStream {
> 
> public static void main(String[] args) {
> //start the producer
> Producer producerThread = new Producer();
> producerThread.start();
> //aggregate the messages via stream
> final Serde messageSerde = Serdes.serdeFrom(new
> MessageSerializer(), new MessageDeserializer());
> final Serde messagesSerde =
> Serdes.serdeFrom(new Serializer() {
> private ObjectMapper objectMapper = new ObjectMapper();
> public void close() {}
> public void configure(Map paramMap, boolean
> paramBoolean) {}
> public byte[] serialize(String paramString, SortedSet
> messages) {
> if (messages == null) {
> return null;
> }
> try {
> ByteArrayOutputStream out = new ByteArrayOutputStream();
> objectMapper.writeValue(out, messages);
> return out.toByteArray();
> } catch (Exception e) {
> throw new SerializationException("Error serializing
> JSON message", e);
> }
> }
> }, new Deserializer() {
> private ObjectMapper objectMapper = new ObjectMapper();
> public void close() {}
> public void configure(Map paramMap, boolean
> paramBoolean) {}
> public SortedSet deserialize(String paramString,
> byte[] paramArrayOfByte) {
> if (paramArrayOfByte == null) {
> return null;
> }
> SortedSet data = 

Re: Kafka windowed table not aggregating correctly

2016-12-12 Thread Guozhang Wang
Hi Sachin,

Note that "until" means that the window will be retained for that period of
time after the window starting time. So when you set the time to 1 year, if
there is a message whose timestamp is 1 year + 1 sec beyond the "current
stream time", then yes it will cause the window to be dropped. But in
practice, if you are confident that you would not likely receive a message
stamped 2017.Dec.12 (from your use case it seems possible that different
source's clocks can be shifted by a bit, but not as much as a year right?
), then it is still helps with the problem.


Guozhang


On Fri, Dec 9, 2016 at 8:57 PM, Sachin Mittal  wrote:

> Hi,
> I think windows retention period does not solves the problem, only delays
> it.
> Based on what I understand say I set the time to 1 year using until.
> Then when I get the message with timestamp 1 year + 1 sec it will delete
> the old windows and create new ones from that message.
> Now let us say we get next message with timestamp 1 year - 1 sec, based on
> what you said, it will ignore this message.
>
> In my case we get messages from different sources whose clocks are not in
> sync. So overall message come with increasing timestamp but for a short
> duration there is no order guarantee.
>
> So I think before deleting the older windows it should retain small portion
> of old windows too, so nearby older messages are not dropped.
>
> I suggest have something like windows.size.advanceBy.until.retain
> Retain will retain the periods which fall under retain ms from the upper
> bound.
>
> So window can be defined as
> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
> 1000l).untill(365 * 24 * 3600 * 1000l).retain(900 * 1000l)
> So when dropping older windows it will retain the ones fall in last 15
> minutes.
>
>
> Please let me know in case I missed something on how and if at all older
> messages are dropped.
>
> Thanks
> Sachin
>
>
>
>
>
> On Sat, Dec 10, 2016 at 5:45 AM, Guozhang Wang  wrote:
>
> > Assuming your windows retention period is the same to the window length,
> > then it is true that ZZ will cause the current window to be dropper. And
> > then when ZZA is recieved, it will not cause the old windows to be
> > re-created but will be ignored since it is considered as "expired".
> >
> > Note that you can set the window retention period much longer than the
> > window length itself, using the "until" API I mentioned above to handle
> any
> > sudden future records.
> >
> >
> >
> > Guozhang
> >
> > On Thu, Dec 8, 2016 at 8:19 PM, Sachin Mittal 
> wrote:
> >
> > > Hi,
> > > Right now in order to circumvent this problem I am using a timestamp
> > whose
> > > values increase by few ms as and when I get new records.
> > > So lets say I have records in order
> > > A -> lower limit TS + 1 sec
> > > B -> lower limit TS + 3 sec
> > > C -> lower limit TS + 5 sec
> > > ..
> > > Z -> upper limit TS - 1 sec
> > >
> > > Now say I get a record ZZ with ts upper limit TS + 1 sec I assume it
> will
> > > drop the previous windows and create new ones based on this timestamp.
> > > Please confirm this understanding.
> > >
> > > Now lets say I get new record ZZA with timestamp (old) upper limit TS
> - 1
> > > sec, will this again cause new windows to be dropped and recreate older
> > > windows fresh with all the older aggregation done so far lost?
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > >
> > >
> > > On Fri, Dec 9, 2016 at 12:16 AM, Guozhang Wang 
> > wrote:
> > >
> > > > Hello Sachin,
> > > >
> > > > I am with you that ideally the windowing segmentation implementation
> > > should
> > > > be totally abstracted from users but today it is a bit confusing to
> > > > understand. I have filed JIRA some time ago to improve on this end:
> > > >
> > > > https://issues.apache.org/jira/browse/KAFKA-3596
> > > >
> > > > So to your example, if a "far future record" was received whose
> > timestamp
> > > > is beyond current time + the retention period, it could potentially
> > cause
> > > > the current window to be dropped.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Fri, Dec 2, 2016 at 10:07 PM, Sachin Mittal 
> > > wrote:
> > > >
> > > > > Hi,
> > > > > I think now it makes all the sense. The field I was using for
> > timestamp
> > > > > extractor contains timestamps which spans for greater than a day's
> > > > duration
> > > > > and it worked for wall clock because for short duration timestamps
> > were
> > > > in
> > > > > day's range.
> > > > >
> > > > > I wanted to understand one thing:
> > > > > Say I have a timestamp extractor field and as record gets ingested
> > > future
> > > > > records will have increasing values for the timestamp.
> > > > >
> > > > > Now lets say default duration is one day. At a future time a record
> > > will
> > > > > have timestamp which now is greater than the initial day's range.
> > > > > What will happen then, it will create 

Re: Kafka windowed table not aggregating correctly

2016-12-08 Thread Sachin Mittal
Hi,
Right now in order to circumvent this problem I am using a timestamp whose
values increase by few ms as and when I get new records.
So lets say I have records in order
A -> lower limit TS + 1 sec
B -> lower limit TS + 3 sec
C -> lower limit TS + 5 sec
..
Z -> upper limit TS - 1 sec

Now say I get a record ZZ with ts upper limit TS + 1 sec I assume it will
drop the previous windows and create new ones based on this timestamp.
Please confirm this understanding.

Now lets say I get new record ZZA with timestamp (old) upper limit TS - 1
sec, will this again cause new windows to be dropped and recreate older
windows fresh with all the older aggregation done so far lost?

Thanks
Sachin




On Fri, Dec 9, 2016 at 12:16 AM, Guozhang Wang  wrote:

> Hello Sachin,
>
> I am with you that ideally the windowing segmentation implementation should
> be totally abstracted from users but today it is a bit confusing to
> understand. I have filed JIRA some time ago to improve on this end:
>
> https://issues.apache.org/jira/browse/KAFKA-3596
>
> So to your example, if a "far future record" was received whose timestamp
> is beyond current time + the retention period, it could potentially cause
> the current window to be dropped.
>
>
> Guozhang
>
>
> On Fri, Dec 2, 2016 at 10:07 PM, Sachin Mittal  wrote:
>
> > Hi,
> > I think now it makes all the sense. The field I was using for timestamp
> > extractor contains timestamps which spans for greater than a day's
> duration
> > and it worked for wall clock because for short duration timestamps were
> in
> > day's range.
> >
> > I wanted to understand one thing:
> > Say I have a timestamp extractor field and as record gets ingested future
> > records will have increasing values for the timestamp.
> >
> > Now lets say default duration is one day. At a future time a record will
> > have timestamp which now is greater than the initial day's range.
> > What will happen then, it will create a new segment and then create
> windows
> > in it for the next day's duration?
> > What happens if now it gets a record from the previous day, will it get
> > discarded or will it again have just the single value aggregated in it
> > (previous values are lost).
> > So when new segment is create as I understand does it retain the older
> > segments data.
> >
> > This is bit confusing, so would be helpful if you can explain in bit more
> > detail.
> >
> > Thanks
> > Sachin
> >
> >
> > On Sat, Dec 3, 2016 at 5:18 AM, Guozhang Wang 
> wrote:
> >
> > > Sachin,
> > >
> > > One thing to note is that the retention of the windowed stores works by
> > > keeping multiple segments of the stores where each segments stores a
> time
> > > range which can potentially span multiple windows, if a new window
> needs
> > to
> > > be created that is further from the oldest segment's time range +
> > retention
> > > period (from your code it seems you do not override it from
> > > TimeWindows.of("stream-table",
> > > 10 * 1000L).advanceBy(5 * 1000L), via until(...)), so the default of
> one
> > > day is used.
> > >
> > > So with WallclockTimeExtractor since it is using system time, it wont
> > give
> > > you timestamps that span for more than a day during a short period of
> > time,
> > > but if your own defined timestamps expand that value, then old segments
> > > will be dropped immediately and hence the aggregate values will be
> > returned
> > > as a single value.
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Dec 2, 2016 at 11:58 AM, Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > > > The extractor is used in
> > > >
> > > > org.apache.kafka.streams.processor.internals.
> > RecordQueue#addRawRecords()
> > > >
> > > > Let us know, if you could resolve the problem or need more help.
> > > >
> > > > -Matthias
> > > >
> > > > On 12/2/16 11:46 AM, Sachin Mittal wrote:
> > > > > https://github.com/SOHU-Co/kafka-node/ this is the node js client
> i
> > am
> > > > > using. The version is 0.5x. Can you please tell me what code in
> > streams
> > > > > calls the timestamp extractor. I can look there to see if there is
> > any
> > > > > issue.
> > > > >
> > > > > Again issue happens only when producing the messages using producer
> > > that
> > > > is
> > > > > compatible with kafka version 0.8x. I see that this producer does
> not
> > > > send
> > > > > a record timestamp as this was introduced in version 0.10 only.
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > > > On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" 
> > > > wrote:
> > > > >
> > > > >> I am not sure what is happening. That's why it would be good to
> > have a
> > > > >> toy example to reproduce the issue.
> > > > >>
> > > > >> What do you mean by "Kafka node version 0.5"?
> > > > >>
> > > > >> -Matthias
> > > > >>
> > > > >> On 12/2/16 11:30 AM, Sachin Mittal wrote:
> > > > >>> I can provide with the data but data does not seem to be the
> issue.
> > > 

Re: Kafka windowed table not aggregating correctly

2016-12-08 Thread Guozhang Wang
Hello Sachin,

I am with you that ideally the windowing segmentation implementation should
be totally abstracted from users but today it is a bit confusing to
understand. I have filed JIRA some time ago to improve on this end:

https://issues.apache.org/jira/browse/KAFKA-3596

So to your example, if a "far future record" was received whose timestamp
is beyond current time + the retention period, it could potentially cause
the current window to be dropped.


Guozhang


On Fri, Dec 2, 2016 at 10:07 PM, Sachin Mittal  wrote:

> Hi,
> I think now it makes all the sense. The field I was using for timestamp
> extractor contains timestamps which spans for greater than a day's duration
> and it worked for wall clock because for short duration timestamps were in
> day's range.
>
> I wanted to understand one thing:
> Say I have a timestamp extractor field and as record gets ingested future
> records will have increasing values for the timestamp.
>
> Now lets say default duration is one day. At a future time a record will
> have timestamp which now is greater than the initial day's range.
> What will happen then, it will create a new segment and then create windows
> in it for the next day's duration?
> What happens if now it gets a record from the previous day, will it get
> discarded or will it again have just the single value aggregated in it
> (previous values are lost).
> So when new segment is create as I understand does it retain the older
> segments data.
>
> This is bit confusing, so would be helpful if you can explain in bit more
> detail.
>
> Thanks
> Sachin
>
>
> On Sat, Dec 3, 2016 at 5:18 AM, Guozhang Wang  wrote:
>
> > Sachin,
> >
> > One thing to note is that the retention of the windowed stores works by
> > keeping multiple segments of the stores where each segments stores a time
> > range which can potentially span multiple windows, if a new window needs
> to
> > be created that is further from the oldest segment's time range +
> retention
> > period (from your code it seems you do not override it from
> > TimeWindows.of("stream-table",
> > 10 * 1000L).advanceBy(5 * 1000L), via until(...)), so the default of one
> > day is used.
> >
> > So with WallclockTimeExtractor since it is using system time, it wont
> give
> > you timestamps that span for more than a day during a short period of
> time,
> > but if your own defined timestamps expand that value, then old segments
> > will be dropped immediately and hence the aggregate values will be
> returned
> > as a single value.
> >
> > Guozhang
> >
> >
> > On Fri, Dec 2, 2016 at 11:58 AM, Matthias J. Sax 
> > wrote:
> >
> > > The extractor is used in
> > >
> > > org.apache.kafka.streams.processor.internals.
> RecordQueue#addRawRecords()
> > >
> > > Let us know, if you could resolve the problem or need more help.
> > >
> > > -Matthias
> > >
> > > On 12/2/16 11:46 AM, Sachin Mittal wrote:
> > > > https://github.com/SOHU-Co/kafka-node/ this is the node js client i
> am
> > > > using. The version is 0.5x. Can you please tell me what code in
> streams
> > > > calls the timestamp extractor. I can look there to see if there is
> any
> > > > issue.
> > > >
> > > > Again issue happens only when producing the messages using producer
> > that
> > > is
> > > > compatible with kafka version 0.8x. I see that this producer does not
> > > send
> > > > a record timestamp as this was introduced in version 0.10 only.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > > On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" 
> > > wrote:
> > > >
> > > >> I am not sure what is happening. That's why it would be good to
> have a
> > > >> toy example to reproduce the issue.
> > > >>
> > > >> What do you mean by "Kafka node version 0.5"?
> > > >>
> > > >> -Matthias
> > > >>
> > > >> On 12/2/16 11:30 AM, Sachin Mittal wrote:
> > > >>> I can provide with the data but data does not seem to be the issue.
> > > >>> If I submit the same data and use same timestamp extractor  using
> the
> > > >> java
> > > >>> client with kafka version 0.10.0.1 aggregation works fine.
> > > >>> I find the issue only when submitting the data with kafka node
> > version
> > > >> 0.5.
> > > >>> It looks like the stream does not extract the time correctly in
> that
> > > >> case.
> > > >>>
> > > >>> Thanks
> > > >>> Sachin
> > > >>>
> > > >>> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax"  >
> > > >> wrote:
> > > >>>
> > >  Can you provide example input data (including timetamps) and
> result.
> > >  What is the expected result (ie, what aggregation do you apply)?
> > > 
> > > 
> > >  -Matthias
> > > 
> > >  On 12/2/16 7:43 AM, Sachin Mittal wrote:
> > > > Hi,
> > > > After much debugging I found an issue with timestamp extractor.
> > > >
> > > > If I use a custom timestamp extractor with following code:
> > > > public static class MessageTimestampExtractor 

Re: Kafka windowed table not aggregating correctly

2016-12-04 Thread Matthias J. Sax
To unsubscribe you need to sent an email to

  users-unsubscr...@kafka.apache.org


-Matthias

On 12/3/16 6:13 PM, williamtellme123 wrote:
> Unsubscribe
> 
> 
> Sent via the Samsung Galaxy S7, an AT 4G LTE smartphone
>  Original message From: Guozhang Wang <wangg...@gmail.com> 
> Date: 12/2/16  5:48 PM  (GMT-06:00) To: users@kafka.apache.org Subject: Re: 
> Kafka windowed table not aggregating correctly 
> Sachin,
> 
> One thing to note is that the retention of the windowed stores works by
> keeping multiple segments of the stores where each segments stores a time
> range which can potentially span multiple windows, if a new window needs to
> be created that is further from the oldest segment's time range + retention
> period (from your code it seems you do not override it from
> TimeWindows.of("stream-table",
> 10 * 1000L).advanceBy(5 * 1000L), via until(...)), so the default of one
> day is used.
> 
> So with WallclockTimeExtractor since it is using system time, it wont give
> you timestamps that span for more than a day during a short period of time,
> but if your own defined timestamps expand that value, then old segments
> will be dropped immediately and hence the aggregate values will be returned
> as a single value.
> 
> Guozhang
> 
> 
> On Fri, Dec 2, 2016 at 11:58 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> The extractor is used in
>>
>> org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords()
>>
>> Let us know, if you could resolve the problem or need more help.
>>
>> -Matthias
>>
>> On 12/2/16 11:46 AM, Sachin Mittal wrote:
>>> https://github.com/SOHU-Co/kafka-node/ this is the node js client i am
>>> using. The version is 0.5x. Can you please tell me what code in streams
>>> calls the timestamp extractor. I can look there to see if there is any
>>> issue.
>>>
>>> Again issue happens only when producing the messages using producer that
>> is
>>> compatible with kafka version 0.8x. I see that this producer does not
>> send
>>> a record timestamp as this was introduced in version 0.10 only.
>>>
>>> Thanks
>>> Sachin
>>>
>>> On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" <matth...@confluent.io>
>> wrote:
>>>
>>>> I am not sure what is happening. That's why it would be good to have a
>>>> toy example to reproduce the issue.
>>>>
>>>> What do you mean by "Kafka node version 0.5"?
>>>>
>>>> -Matthias
>>>>
>>>> On 12/2/16 11:30 AM, Sachin Mittal wrote:
>>>>> I can provide with the data but data does not seem to be the issue.
>>>>> If I submit the same data and use same timestamp extractor  using the
>>>> java
>>>>> client with kafka version 0.10.0.1 aggregation works fine.
>>>>> I find the issue only when submitting the data with kafka node version
>>>> 0.5.
>>>>> It looks like the stream does not extract the time correctly in that
>>>> case.
>>>>>
>>>>> Thanks
>>>>> Sachin
>>>>>
>>>>> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" <matth...@confluent.io>
>>>> wrote:
>>>>>
>>>>>> Can you provide example input data (including timetamps) and result.
>>>>>> What is the expected result (ie, what aggregation do you apply)?
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 12/2/16 7:43 AM, Sachin Mittal wrote:
>>>>>>> Hi,
>>>>>>> After much debugging I found an issue with timestamp extractor.
>>>>>>>
>>>>>>> If I use a custom timestamp extractor with following code:
>>>>>>>  public static class MessageTimestampExtractor implements
>>>>>>> TimestampExtractor {
>>>>>>>  public long extract(ConsumerRecord<Object, Object> record) {
>>>>>>>  if (record.value() instanceof Message) {
>>>>>>>  return ((Message) record.value()).ts;
>>>>>>>  } else {
>>>>>>>  return record.timestamp();
>>>>>>>  }
>>>>>>>  }
>>>>>>>  }
>>>>>>>
>>>>>>> Here message has a long field ts which stores the

Re: Kafka windowed table not aggregating correctly

2016-12-03 Thread williamtellme123
Unsubscribe


Sent via the Samsung Galaxy S7, an AT 4G LTE smartphone
 Original message From: Guozhang Wang <wangg...@gmail.com> 
Date: 12/2/16  5:48 PM  (GMT-06:00) To: users@kafka.apache.org Subject: Re: 
Kafka windowed table not aggregating correctly 
Sachin,

One thing to note is that the retention of the windowed stores works by
keeping multiple segments of the stores where each segments stores a time
range which can potentially span multiple windows, if a new window needs to
be created that is further from the oldest segment's time range + retention
period (from your code it seems you do not override it from
TimeWindows.of("stream-table",
10 * 1000L).advanceBy(5 * 1000L), via until(...)), so the default of one
day is used.

So with WallclockTimeExtractor since it is using system time, it wont give
you timestamps that span for more than a day during a short period of time,
but if your own defined timestamps expand that value, then old segments
will be dropped immediately and hence the aggregate values will be returned
as a single value.

Guozhang


On Fri, Dec 2, 2016 at 11:58 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> The extractor is used in
>
> org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords()
>
> Let us know, if you could resolve the problem or need more help.
>
> -Matthias
>
> On 12/2/16 11:46 AM, Sachin Mittal wrote:
> > https://github.com/SOHU-Co/kafka-node/ this is the node js client i am
> > using. The version is 0.5x. Can you please tell me what code in streams
> > calls the timestamp extractor. I can look there to see if there is any
> > issue.
> >
> > Again issue happens only when producing the messages using producer that
> is
> > compatible with kafka version 0.8x. I see that this producer does not
> send
> > a record timestamp as this was introduced in version 0.10 only.
> >
> > Thanks
> > Sachin
> >
> > On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" <matth...@confluent.io>
> wrote:
> >
> >> I am not sure what is happening. That's why it would be good to have a
> >> toy example to reproduce the issue.
> >>
> >> What do you mean by "Kafka node version 0.5"?
> >>
> >> -Matthias
> >>
> >> On 12/2/16 11:30 AM, Sachin Mittal wrote:
> >>> I can provide with the data but data does not seem to be the issue.
> >>> If I submit the same data and use same timestamp extractor  using the
> >> java
> >>> client with kafka version 0.10.0.1 aggregation works fine.
> >>> I find the issue only when submitting the data with kafka node version
> >> 0.5.
> >>> It looks like the stream does not extract the time correctly in that
> >> case.
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" <matth...@confluent.io>
> >> wrote:
> >>>
> >>>> Can you provide example input data (including timetamps) and result.
> >>>> What is the expected result (ie, what aggregation do you apply)?
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 12/2/16 7:43 AM, Sachin Mittal wrote:
> >>>>> Hi,
> >>>>> After much debugging I found an issue with timestamp extractor.
> >>>>>
> >>>>> If I use a custom timestamp extractor with following code:
> >>>>> public static class MessageTimestampExtractor implements
> >>>>> TimestampExtractor {
> >>>>> public long extract(ConsumerRecord<Object, Object> record) {
> >>>>> if (record.value() instanceof Message) {
> >>>>> return ((Message) record.value()).ts;
> >>>>> } else {
> >>>>> return record.timestamp();
> >>>>> }
> >>>>> }
> >>>>> }
> >>>>>
> >>>>> Here message has a long field ts which stores the timestamp, the
> >>>>> aggregation does not work.
> >>>>> Note I have checked and ts has valid timestamp values.
> >>>>>
> >>>>> However if I replace it with say WallclockTimestampExtractor
> >> aggregation
> >>>> is
> >>>>> working fine.
> >>>>>
> >>>>> I do not understand what could be the issue here.
> >>>>>
> >>>>> Also note I am us

Re: Kafka windowed table not aggregating correctly

2016-12-02 Thread Sachin Mittal
Hi,
I think now it makes all the sense. The field I was using for timestamp
extractor contains timestamps which spans for greater than a day's duration
and it worked for wall clock because for short duration timestamps were in
day's range.

I wanted to understand one thing:
Say I have a timestamp extractor field and as record gets ingested future
records will have increasing values for the timestamp.

Now lets say default duration is one day. At a future time a record will
have timestamp which now is greater than the initial day's range.
What will happen then, it will create a new segment and then create windows
in it for the next day's duration?
What happens if now it gets a record from the previous day, will it get
discarded or will it again have just the single value aggregated in it
(previous values are lost).
So when new segment is create as I understand does it retain the older
segments data.

This is bit confusing, so would be helpful if you can explain in bit more
detail.

Thanks
Sachin


On Sat, Dec 3, 2016 at 5:18 AM, Guozhang Wang  wrote:

> Sachin,
>
> One thing to note is that the retention of the windowed stores works by
> keeping multiple segments of the stores where each segments stores a time
> range which can potentially span multiple windows, if a new window needs to
> be created that is further from the oldest segment's time range + retention
> period (from your code it seems you do not override it from
> TimeWindows.of("stream-table",
> 10 * 1000L).advanceBy(5 * 1000L), via until(...)), so the default of one
> day is used.
>
> So with WallclockTimeExtractor since it is using system time, it wont give
> you timestamps that span for more than a day during a short period of time,
> but if your own defined timestamps expand that value, then old segments
> will be dropped immediately and hence the aggregate values will be returned
> as a single value.
>
> Guozhang
>
>
> On Fri, Dec 2, 2016 at 11:58 AM, Matthias J. Sax 
> wrote:
>
> > The extractor is used in
> >
> > org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords()
> >
> > Let us know, if you could resolve the problem or need more help.
> >
> > -Matthias
> >
> > On 12/2/16 11:46 AM, Sachin Mittal wrote:
> > > https://github.com/SOHU-Co/kafka-node/ this is the node js client i am
> > > using. The version is 0.5x. Can you please tell me what code in streams
> > > calls the timestamp extractor. I can look there to see if there is any
> > > issue.
> > >
> > > Again issue happens only when producing the messages using producer
> that
> > is
> > > compatible with kafka version 0.8x. I see that this producer does not
> > send
> > > a record timestamp as this was introduced in version 0.10 only.
> > >
> > > Thanks
> > > Sachin
> > >
> > > On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" 
> > wrote:
> > >
> > >> I am not sure what is happening. That's why it would be good to have a
> > >> toy example to reproduce the issue.
> > >>
> > >> What do you mean by "Kafka node version 0.5"?
> > >>
> > >> -Matthias
> > >>
> > >> On 12/2/16 11:30 AM, Sachin Mittal wrote:
> > >>> I can provide with the data but data does not seem to be the issue.
> > >>> If I submit the same data and use same timestamp extractor  using the
> > >> java
> > >>> client with kafka version 0.10.0.1 aggregation works fine.
> > >>> I find the issue only when submitting the data with kafka node
> version
> > >> 0.5.
> > >>> It looks like the stream does not extract the time correctly in that
> > >> case.
> > >>>
> > >>> Thanks
> > >>> Sachin
> > >>>
> > >>> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" 
> > >> wrote:
> > >>>
> >  Can you provide example input data (including timetamps) and result.
> >  What is the expected result (ie, what aggregation do you apply)?
> > 
> > 
> >  -Matthias
> > 
> >  On 12/2/16 7:43 AM, Sachin Mittal wrote:
> > > Hi,
> > > After much debugging I found an issue with timestamp extractor.
> > >
> > > If I use a custom timestamp extractor with following code:
> > > public static class MessageTimestampExtractor implements
> > > TimestampExtractor {
> > > public long extract(ConsumerRecord record)
> {
> > > if (record.value() instanceof Message) {
> > > return ((Message) record.value()).ts;
> > > } else {
> > > return record.timestamp();
> > > }
> > > }
> > > }
> > >
> > > Here message has a long field ts which stores the timestamp, the
> > > aggregation does not work.
> > > Note I have checked and ts has valid timestamp values.
> > >
> > > However if I replace it with say WallclockTimestampExtractor
> > >> aggregation
> >  is
> > > working fine.
> > >
> > > I do not understand what could be the issue here.
> > >
> > > 

Re: Kafka windowed table not aggregating correctly

2016-12-02 Thread Guozhang Wang
Sachin,

One thing to note is that the retention of the windowed stores works by
keeping multiple segments of the stores where each segments stores a time
range which can potentially span multiple windows, if a new window needs to
be created that is further from the oldest segment's time range + retention
period (from your code it seems you do not override it from
TimeWindows.of("stream-table",
10 * 1000L).advanceBy(5 * 1000L), via until(...)), so the default of one
day is used.

So with WallclockTimeExtractor since it is using system time, it wont give
you timestamps that span for more than a day during a short period of time,
but if your own defined timestamps expand that value, then old segments
will be dropped immediately and hence the aggregate values will be returned
as a single value.

Guozhang


On Fri, Dec 2, 2016 at 11:58 AM, Matthias J. Sax 
wrote:

> The extractor is used in
>
> org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords()
>
> Let us know, if you could resolve the problem or need more help.
>
> -Matthias
>
> On 12/2/16 11:46 AM, Sachin Mittal wrote:
> > https://github.com/SOHU-Co/kafka-node/ this is the node js client i am
> > using. The version is 0.5x. Can you please tell me what code in streams
> > calls the timestamp extractor. I can look there to see if there is any
> > issue.
> >
> > Again issue happens only when producing the messages using producer that
> is
> > compatible with kafka version 0.8x. I see that this producer does not
> send
> > a record timestamp as this was introduced in version 0.10 only.
> >
> > Thanks
> > Sachin
> >
> > On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" 
> wrote:
> >
> >> I am not sure what is happening. That's why it would be good to have a
> >> toy example to reproduce the issue.
> >>
> >> What do you mean by "Kafka node version 0.5"?
> >>
> >> -Matthias
> >>
> >> On 12/2/16 11:30 AM, Sachin Mittal wrote:
> >>> I can provide with the data but data does not seem to be the issue.
> >>> If I submit the same data and use same timestamp extractor  using the
> >> java
> >>> client with kafka version 0.10.0.1 aggregation works fine.
> >>> I find the issue only when submitting the data with kafka node version
> >> 0.5.
> >>> It looks like the stream does not extract the time correctly in that
> >> case.
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" 
> >> wrote:
> >>>
>  Can you provide example input data (including timetamps) and result.
>  What is the expected result (ie, what aggregation do you apply)?
> 
> 
>  -Matthias
> 
>  On 12/2/16 7:43 AM, Sachin Mittal wrote:
> > Hi,
> > After much debugging I found an issue with timestamp extractor.
> >
> > If I use a custom timestamp extractor with following code:
> > public static class MessageTimestampExtractor implements
> > TimestampExtractor {
> > public long extract(ConsumerRecord record) {
> > if (record.value() instanceof Message) {
> > return ((Message) record.value()).ts;
> > } else {
> > return record.timestamp();
> > }
> > }
> > }
> >
> > Here message has a long field ts which stores the timestamp, the
> > aggregation does not work.
> > Note I have checked and ts has valid timestamp values.
> >
> > However if I replace it with say WallclockTimestampExtractor
> >> aggregation
>  is
> > working fine.
> >
> > I do not understand what could be the issue here.
> >
> > Also note I am using kafka streams version 0.10.0.1 and I am
> publishing
> > messages via
> > https://github.com/SOHU-Co/kafka-node/ whose version is quite old
> >> 0.5.x
> >
> > Let me know if there is some bug in time stamp extractions.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang 
>  wrote:
> >
> >> Sachin,
> >>
> >> This is indeed a bit wired, and we'd like to try to re-produce your
>  issue
> >> locally. Do you have a sample input data for us to try out?
> >>
> >> Guozhang
> >>
> >> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal  >
> >> wrote:
> >>
> >>> Hi,
> >>> I fixed that sorted set issue but I am facing a weird problem
> which I
>  am
> >>> not able to replicate.
> >>>
> >>> Here is the sample problem that I could isolate:
> >>> My class is like this:
> >>> public static class Message implements Comparable {
> >>> public long ts;
> >>> public String message;
> >>> public String key;
> >>> public Message() {};
> >>> public Message(long ts, String message, String key) {
> >>> 

Re: Kafka windowed table not aggregating correctly

2016-12-02 Thread Matthias J. Sax
The extractor is used in

org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords()

Let us know, if you could resolve the problem or need more help.

-Matthias

On 12/2/16 11:46 AM, Sachin Mittal wrote:
> https://github.com/SOHU-Co/kafka-node/ this is the node js client i am
> using. The version is 0.5x. Can you please tell me what code in streams
> calls the timestamp extractor. I can look there to see if there is any
> issue.
> 
> Again issue happens only when producing the messages using producer that is
> compatible with kafka version 0.8x. I see that this producer does not send
> a record timestamp as this was introduced in version 0.10 only.
> 
> Thanks
> Sachin
> 
> On 3 Dec 2016 1:03 a.m., "Matthias J. Sax"  wrote:
> 
>> I am not sure what is happening. That's why it would be good to have a
>> toy example to reproduce the issue.
>>
>> What do you mean by "Kafka node version 0.5"?
>>
>> -Matthias
>>
>> On 12/2/16 11:30 AM, Sachin Mittal wrote:
>>> I can provide with the data but data does not seem to be the issue.
>>> If I submit the same data and use same timestamp extractor  using the
>> java
>>> client with kafka version 0.10.0.1 aggregation works fine.
>>> I find the issue only when submitting the data with kafka node version
>> 0.5.
>>> It looks like the stream does not extract the time correctly in that
>> case.
>>>
>>> Thanks
>>> Sachin
>>>
>>> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" 
>> wrote:
>>>
 Can you provide example input data (including timetamps) and result.
 What is the expected result (ie, what aggregation do you apply)?


 -Matthias

 On 12/2/16 7:43 AM, Sachin Mittal wrote:
> Hi,
> After much debugging I found an issue with timestamp extractor.
>
> If I use a custom timestamp extractor with following code:
> public static class MessageTimestampExtractor implements
> TimestampExtractor {
> public long extract(ConsumerRecord record) {
> if (record.value() instanceof Message) {
> return ((Message) record.value()).ts;
> } else {
> return record.timestamp();
> }
> }
> }
>
> Here message has a long field ts which stores the timestamp, the
> aggregation does not work.
> Note I have checked and ts has valid timestamp values.
>
> However if I replace it with say WallclockTimestampExtractor
>> aggregation
 is
> working fine.
>
> I do not understand what could be the issue here.
>
> Also note I am using kafka streams version 0.10.0.1 and I am publishing
> messages via
> https://github.com/SOHU-Co/kafka-node/ whose version is quite old
>> 0.5.x
>
> Let me know if there is some bug in time stamp extractions.
>
> Thanks
> Sachin
>
>
>
> On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang 
 wrote:
>
>> Sachin,
>>
>> This is indeed a bit wired, and we'd like to try to re-produce your
 issue
>> locally. Do you have a sample input data for us to try out?
>>
>> Guozhang
>>
>> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal 
>> wrote:
>>
>>> Hi,
>>> I fixed that sorted set issue but I am facing a weird problem which I
 am
>>> not able to replicate.
>>>
>>> Here is the sample problem that I could isolate:
>>> My class is like this:
>>> public static class Message implements Comparable {
>>> public long ts;
>>> public String message;
>>> public String key;
>>> public Message() {};
>>> public Message(long ts, String message, String key) {
>>> this.ts = ts;
>>> this.key = key;
>>> this.message = message;
>>> }
>>> public int compareTo(Message paramT) {
>>> long ts1 = paramT.ts;
>>> return ts > ts1 ? 1 : -1;
>>> }
>>> }
>>>
>>> pipeline is like this:
>>> builder.stream(Serdes.String(), messageSerde, "test-window-stream")\
>>>  .map(new KeyValueMapper> Message>>()
 {
>>>  public KeyValue apply(String key, Message
>> value)
 {
>>>  return new KeyValue(value.key, value);
>>>   }
>>>  })
>>> .through(Serdes.String(), messageSerde, "test-window-key-stream")
>>> .aggregateByKey(new Initializer() {
>>> public SortedSet apply() {
>>> return new TreeSet();
>>> }
>>> }, new Aggregator() {
>>> public SortedSet apply(String aggKey, Message value,
>>> SortedSet aggregate) {
>>> aggregate.add(value);
>>> return aggregate;
>>> }

Re: Kafka windowed table not aggregating correctly

2016-12-02 Thread Sachin Mittal
https://github.com/SOHU-Co/kafka-node/ this is the node js client i am
using. The version is 0.5x. Can you please tell me what code in streams
calls the timestamp extractor. I can look there to see if there is any
issue.

Again issue happens only when producing the messages using producer that is
compatible with kafka version 0.8x. I see that this producer does not send
a record timestamp as this was introduced in version 0.10 only.

Thanks
Sachin

On 3 Dec 2016 1:03 a.m., "Matthias J. Sax"  wrote:

> I am not sure what is happening. That's why it would be good to have a
> toy example to reproduce the issue.
>
> What do you mean by "Kafka node version 0.5"?
>
> -Matthias
>
> On 12/2/16 11:30 AM, Sachin Mittal wrote:
> > I can provide with the data but data does not seem to be the issue.
> > If I submit the same data and use same timestamp extractor  using the
> java
> > client with kafka version 0.10.0.1 aggregation works fine.
> > I find the issue only when submitting the data with kafka node version
> 0.5.
> > It looks like the stream does not extract the time correctly in that
> case.
> >
> > Thanks
> > Sachin
> >
> > On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" 
> wrote:
> >
> >> Can you provide example input data (including timetamps) and result.
> >> What is the expected result (ie, what aggregation do you apply)?
> >>
> >>
> >> -Matthias
> >>
> >> On 12/2/16 7:43 AM, Sachin Mittal wrote:
> >>> Hi,
> >>> After much debugging I found an issue with timestamp extractor.
> >>>
> >>> If I use a custom timestamp extractor with following code:
> >>> public static class MessageTimestampExtractor implements
> >>> TimestampExtractor {
> >>> public long extract(ConsumerRecord record) {
> >>> if (record.value() instanceof Message) {
> >>> return ((Message) record.value()).ts;
> >>> } else {
> >>> return record.timestamp();
> >>> }
> >>> }
> >>> }
> >>>
> >>> Here message has a long field ts which stores the timestamp, the
> >>> aggregation does not work.
> >>> Note I have checked and ts has valid timestamp values.
> >>>
> >>> However if I replace it with say WallclockTimestampExtractor
> aggregation
> >> is
> >>> working fine.
> >>>
> >>> I do not understand what could be the issue here.
> >>>
> >>> Also note I am using kafka streams version 0.10.0.1 and I am publishing
> >>> messages via
> >>> https://github.com/SOHU-Co/kafka-node/ whose version is quite old
> 0.5.x
> >>>
> >>> Let me know if there is some bug in time stamp extractions.
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>>
> >>>
> >>> On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang 
> >> wrote:
> >>>
>  Sachin,
> 
>  This is indeed a bit wired, and we'd like to try to re-produce your
> >> issue
>  locally. Do you have a sample input data for us to try out?
> 
>  Guozhang
> 
>  On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal 
>  wrote:
> 
> > Hi,
> > I fixed that sorted set issue but I am facing a weird problem which I
> >> am
> > not able to replicate.
> >
> > Here is the sample problem that I could isolate:
> > My class is like this:
> > public static class Message implements Comparable {
> > public long ts;
> > public String message;
> > public String key;
> > public Message() {};
> > public Message(long ts, String message, String key) {
> > this.ts = ts;
> > this.key = key;
> > this.message = message;
> > }
> > public int compareTo(Message paramT) {
> > long ts1 = paramT.ts;
> > return ts > ts1 ? 1 : -1;
> > }
> > }
> >
> > pipeline is like this:
> > builder.stream(Serdes.String(), messageSerde, "test-window-stream")\
> >  .map(new KeyValueMapper Message>>()
> >> {
> >  public KeyValue apply(String key, Message
> value)
> >> {
> >  return new KeyValue(value.key, value);
> >   }
> >  })
> > .through(Serdes.String(), messageSerde, "test-window-key-stream")
> > .aggregateByKey(new Initializer() {
> > public SortedSet apply() {
> > return new TreeSet();
> > }
> > }, new Aggregator() {
> > public SortedSet apply(String aggKey, Message value,
> > SortedSet aggregate) {
> > aggregate.add(value);
> > return aggregate;
> > }
> > }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
> > Serdes.String(), messagesSerde)
> > .foreach(new ForeachAction() {
> > public void apply(Windowed key, SortedSet
> >> messages)
>  {
> >  

Re: Kafka windowed table not aggregating correctly

2016-12-02 Thread Matthias J. Sax
I am not sure what is happening. That's why it would be good to have a
toy example to reproduce the issue.

What do you mean by "Kafka node version 0.5"?

-Matthias

On 12/2/16 11:30 AM, Sachin Mittal wrote:
> I can provide with the data but data does not seem to be the issue.
> If I submit the same data and use same timestamp extractor  using the java
> client with kafka version 0.10.0.1 aggregation works fine.
> I find the issue only when submitting the data with kafka node version 0.5.
> It looks like the stream does not extract the time correctly in that case.
> 
> Thanks
> Sachin
> 
> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax"  wrote:
> 
>> Can you provide example input data (including timetamps) and result.
>> What is the expected result (ie, what aggregation do you apply)?
>>
>>
>> -Matthias
>>
>> On 12/2/16 7:43 AM, Sachin Mittal wrote:
>>> Hi,
>>> After much debugging I found an issue with timestamp extractor.
>>>
>>> If I use a custom timestamp extractor with following code:
>>> public static class MessageTimestampExtractor implements
>>> TimestampExtractor {
>>> public long extract(ConsumerRecord record) {
>>> if (record.value() instanceof Message) {
>>> return ((Message) record.value()).ts;
>>> } else {
>>> return record.timestamp();
>>> }
>>> }
>>> }
>>>
>>> Here message has a long field ts which stores the timestamp, the
>>> aggregation does not work.
>>> Note I have checked and ts has valid timestamp values.
>>>
>>> However if I replace it with say WallclockTimestampExtractor aggregation
>> is
>>> working fine.
>>>
>>> I do not understand what could be the issue here.
>>>
>>> Also note I am using kafka streams version 0.10.0.1 and I am publishing
>>> messages via
>>> https://github.com/SOHU-Co/kafka-node/ whose version is quite old 0.5.x
>>>
>>> Let me know if there is some bug in time stamp extractions.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>>
>>> On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang 
>> wrote:
>>>
 Sachin,

 This is indeed a bit wired, and we'd like to try to re-produce your
>> issue
 locally. Do you have a sample input data for us to try out?

 Guozhang

 On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal 
 wrote:

> Hi,
> I fixed that sorted set issue but I am facing a weird problem which I
>> am
> not able to replicate.
>
> Here is the sample problem that I could isolate:
> My class is like this:
> public static class Message implements Comparable {
> public long ts;
> public String message;
> public String key;
> public Message() {};
> public Message(long ts, String message, String key) {
> this.ts = ts;
> this.key = key;
> this.message = message;
> }
> public int compareTo(Message paramT) {
> long ts1 = paramT.ts;
> return ts > ts1 ? 1 : -1;
> }
> }
>
> pipeline is like this:
> builder.stream(Serdes.String(), messageSerde, "test-window-stream")\
>  .map(new KeyValueMapper>()
>> {
>  public KeyValue apply(String key, Message value)
>> {
>  return new KeyValue(value.key, value);
>   }
>  })
> .through(Serdes.String(), messageSerde, "test-window-key-stream")
> .aggregateByKey(new Initializer() {
> public SortedSet apply() {
> return new TreeSet();
> }
> }, new Aggregator() {
> public SortedSet apply(String aggKey, Message value,
> SortedSet aggregate) {
> aggregate.add(value);
> return aggregate;
> }
> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
> Serdes.String(), messagesSerde)
> .foreach(new ForeachAction() {
> public void apply(Windowed key, SortedSet
>> messages)
 {
> ...
> }
> });
>
> So basically I rekey the original message into another topic and then
> aggregate it based on that key.
> What I have observed is that when I used windowed aggregation the
> aggregator does not use previous aggregated value.
>
> public SortedSet apply(String aggKey, Message value,
> SortedSet aggregate) {
> aggregate.add(value);
> return aggregate;
> }
>
> So in the above function the aggregate is an empty set of every value
> entering into pipeline. When I remove the windowed aggregation, the
> aggregate set retains previously aggregated values in the set.
>
> I am just not able to wrap my head around it. When I ran this type of
 test
> locally on windows it 

Re: Kafka windowed table not aggregating correctly

2016-12-02 Thread Sachin Mittal
I can provide with the data but data does not seem to be the issue.
If I submit the same data and use same timestamp extractor  using the java
client with kafka version 0.10.0.1 aggregation works fine.
I find the issue only when submitting the data with kafka node version 0.5.
It looks like the stream does not extract the time correctly in that case.

Thanks
Sachin

On 2 Dec 2016 11:41 p.m., "Matthias J. Sax"  wrote:

> Can you provide example input data (including timetamps) and result.
> What is the expected result (ie, what aggregation do you apply)?
>
>
> -Matthias
>
> On 12/2/16 7:43 AM, Sachin Mittal wrote:
> > Hi,
> > After much debugging I found an issue with timestamp extractor.
> >
> > If I use a custom timestamp extractor with following code:
> > public static class MessageTimestampExtractor implements
> > TimestampExtractor {
> > public long extract(ConsumerRecord record) {
> > if (record.value() instanceof Message) {
> > return ((Message) record.value()).ts;
> > } else {
> > return record.timestamp();
> > }
> > }
> > }
> >
> > Here message has a long field ts which stores the timestamp, the
> > aggregation does not work.
> > Note I have checked and ts has valid timestamp values.
> >
> > However if I replace it with say WallclockTimestampExtractor aggregation
> is
> > working fine.
> >
> > I do not understand what could be the issue here.
> >
> > Also note I am using kafka streams version 0.10.0.1 and I am publishing
> > messages via
> > https://github.com/SOHU-Co/kafka-node/ whose version is quite old 0.5.x
> >
> > Let me know if there is some bug in time stamp extractions.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang 
> wrote:
> >
> >> Sachin,
> >>
> >> This is indeed a bit wired, and we'd like to try to re-produce your
> issue
> >> locally. Do you have a sample input data for us to try out?
> >>
> >> Guozhang
> >>
> >> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal 
> >> wrote:
> >>
> >>> Hi,
> >>> I fixed that sorted set issue but I am facing a weird problem which I
> am
> >>> not able to replicate.
> >>>
> >>> Here is the sample problem that I could isolate:
> >>> My class is like this:
> >>> public static class Message implements Comparable {
> >>> public long ts;
> >>> public String message;
> >>> public String key;
> >>> public Message() {};
> >>> public Message(long ts, String message, String key) {
> >>> this.ts = ts;
> >>> this.key = key;
> >>> this.message = message;
> >>> }
> >>> public int compareTo(Message paramT) {
> >>> long ts1 = paramT.ts;
> >>> return ts > ts1 ? 1 : -1;
> >>> }
> >>> }
> >>>
> >>> pipeline is like this:
> >>> builder.stream(Serdes.String(), messageSerde, "test-window-stream")\
> >>>  .map(new KeyValueMapper>()
> {
> >>>  public KeyValue apply(String key, Message value)
> {
> >>>  return new KeyValue(value.key, value);
> >>>   }
> >>>  })
> >>> .through(Serdes.String(), messageSerde, "test-window-key-stream")
> >>> .aggregateByKey(new Initializer() {
> >>> public SortedSet apply() {
> >>> return new TreeSet();
> >>> }
> >>> }, new Aggregator() {
> >>> public SortedSet apply(String aggKey, Message value,
> >>> SortedSet aggregate) {
> >>> aggregate.add(value);
> >>> return aggregate;
> >>> }
> >>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
> >>> Serdes.String(), messagesSerde)
> >>> .foreach(new ForeachAction() {
> >>> public void apply(Windowed key, SortedSet
> messages)
> >> {
> >>> ...
> >>> }
> >>> });
> >>>
> >>> So basically I rekey the original message into another topic and then
> >>> aggregate it based on that key.
> >>> What I have observed is that when I used windowed aggregation the
> >>> aggregator does not use previous aggregated value.
> >>>
> >>> public SortedSet apply(String aggKey, Message value,
> >>> SortedSet aggregate) {
> >>> aggregate.add(value);
> >>> return aggregate;
> >>> }
> >>>
> >>> So in the above function the aggregate is an empty set of every value
> >>> entering into pipeline. When I remove the windowed aggregation, the
> >>> aggregate set retains previously aggregated values in the set.
> >>>
> >>> I am just not able to wrap my head around it. When I ran this type of
> >> test
> >>> locally on windows it is working fine. However a similar pipeline setup
> >>> when run against production on linux is behaving strangely and always
> >>> getting an empty aggregate set.
> >>> Any idea what could be the reason, where should I look at the problem.
> 

Re: Kafka windowed table not aggregating correctly

2016-12-02 Thread Matthias J. Sax
Can you provide example input data (including timetamps) and result.
What is the expected result (ie, what aggregation do you apply)?


-Matthias

On 12/2/16 7:43 AM, Sachin Mittal wrote:
> Hi,
> After much debugging I found an issue with timestamp extractor.
> 
> If I use a custom timestamp extractor with following code:
> public static class MessageTimestampExtractor implements
> TimestampExtractor {
> public long extract(ConsumerRecord record) {
> if (record.value() instanceof Message) {
> return ((Message) record.value()).ts;
> } else {
> return record.timestamp();
> }
> }
> }
> 
> Here message has a long field ts which stores the timestamp, the
> aggregation does not work.
> Note I have checked and ts has valid timestamp values.
> 
> However if I replace it with say WallclockTimestampExtractor aggregation is
> working fine.
> 
> I do not understand what could be the issue here.
> 
> Also note I am using kafka streams version 0.10.0.1 and I am publishing
> messages via
> https://github.com/SOHU-Co/kafka-node/ whose version is quite old 0.5.x
> 
> Let me know if there is some bug in time stamp extractions.
> 
> Thanks
> Sachin
> 
> 
> 
> On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang  wrote:
> 
>> Sachin,
>>
>> This is indeed a bit wired, and we'd like to try to re-produce your issue
>> locally. Do you have a sample input data for us to try out?
>>
>> Guozhang
>>
>> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal 
>> wrote:
>>
>>> Hi,
>>> I fixed that sorted set issue but I am facing a weird problem which I am
>>> not able to replicate.
>>>
>>> Here is the sample problem that I could isolate:
>>> My class is like this:
>>> public static class Message implements Comparable {
>>> public long ts;
>>> public String message;
>>> public String key;
>>> public Message() {};
>>> public Message(long ts, String message, String key) {
>>> this.ts = ts;
>>> this.key = key;
>>> this.message = message;
>>> }
>>> public int compareTo(Message paramT) {
>>> long ts1 = paramT.ts;
>>> return ts > ts1 ? 1 : -1;
>>> }
>>> }
>>>
>>> pipeline is like this:
>>> builder.stream(Serdes.String(), messageSerde, "test-window-stream")\
>>>  .map(new KeyValueMapper>() {
>>>  public KeyValue apply(String key, Message value) {
>>>  return new KeyValue(value.key, value);
>>>   }
>>>  })
>>> .through(Serdes.String(), messageSerde, "test-window-key-stream")
>>> .aggregateByKey(new Initializer() {
>>> public SortedSet apply() {
>>> return new TreeSet();
>>> }
>>> }, new Aggregator() {
>>> public SortedSet apply(String aggKey, Message value,
>>> SortedSet aggregate) {
>>> aggregate.add(value);
>>> return aggregate;
>>> }
>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
>>> Serdes.String(), messagesSerde)
>>> .foreach(new ForeachAction() {
>>> public void apply(Windowed key, SortedSet messages)
>> {
>>> ...
>>> }
>>> });
>>>
>>> So basically I rekey the original message into another topic and then
>>> aggregate it based on that key.
>>> What I have observed is that when I used windowed aggregation the
>>> aggregator does not use previous aggregated value.
>>>
>>> public SortedSet apply(String aggKey, Message value,
>>> SortedSet aggregate) {
>>> aggregate.add(value);
>>> return aggregate;
>>> }
>>>
>>> So in the above function the aggregate is an empty set of every value
>>> entering into pipeline. When I remove the windowed aggregation, the
>>> aggregate set retains previously aggregated values in the set.
>>>
>>> I am just not able to wrap my head around it. When I ran this type of
>> test
>>> locally on windows it is working fine. However a similar pipeline setup
>>> when run against production on linux is behaving strangely and always
>>> getting an empty aggregate set.
>>> Any idea what could be the reason, where should I look at the problem.
>> Does
>>> length of key string matters here? I will later try to run the same
>> simple
>>> setup on linux and see what happens. But this is a very strange behavior.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>>
>>> On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang 
>>> wrote:
>>>
 Hello Sachin,

 In the implementation of SortedSet, if the object's implemented the
 Comparable interface, that compareTo function is applied in "
 aggregate.add(value);", and hence if it returns 0, this element will
>> not
>>> be
 added since it is a Set.


 Guozhang


 On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal 
 wrote:

Re: Kafka windowed table not aggregating correctly

2016-12-02 Thread Sachin Mittal
Hi,
After much debugging I found an issue with timestamp extractor.

If I use a custom timestamp extractor with following code:
public static class MessageTimestampExtractor implements
TimestampExtractor {
public long extract(ConsumerRecord record) {
if (record.value() instanceof Message) {
return ((Message) record.value()).ts;
} else {
return record.timestamp();
}
}
}

Here message has a long field ts which stores the timestamp, the
aggregation does not work.
Note I have checked and ts has valid timestamp values.

However if I replace it with say WallclockTimestampExtractor aggregation is
working fine.

I do not understand what could be the issue here.

Also note I am using kafka streams version 0.10.0.1 and I am publishing
messages via
https://github.com/SOHU-Co/kafka-node/ whose version is quite old 0.5.x

Let me know if there is some bug in time stamp extractions.

Thanks
Sachin



On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang  wrote:

> Sachin,
>
> This is indeed a bit wired, and we'd like to try to re-produce your issue
> locally. Do you have a sample input data for us to try out?
>
> Guozhang
>
> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal 
> wrote:
>
> > Hi,
> > I fixed that sorted set issue but I am facing a weird problem which I am
> > not able to replicate.
> >
> > Here is the sample problem that I could isolate:
> > My class is like this:
> > public static class Message implements Comparable {
> > public long ts;
> > public String message;
> > public String key;
> > public Message() {};
> > public Message(long ts, String message, String key) {
> > this.ts = ts;
> > this.key = key;
> > this.message = message;
> > }
> > public int compareTo(Message paramT) {
> > long ts1 = paramT.ts;
> > return ts > ts1 ? 1 : -1;
> > }
> > }
> >
> > pipeline is like this:
> > builder.stream(Serdes.String(), messageSerde, "test-window-stream")\
> >  .map(new KeyValueMapper>() {
> >  public KeyValue apply(String key, Message value) {
> >  return new KeyValue(value.key, value);
> >   }
> >  })
> > .through(Serdes.String(), messageSerde, "test-window-key-stream")
> > .aggregateByKey(new Initializer() {
> > public SortedSet apply() {
> > return new TreeSet();
> > }
> > }, new Aggregator() {
> > public SortedSet apply(String aggKey, Message value,
> > SortedSet aggregate) {
> > aggregate.add(value);
> > return aggregate;
> > }
> > }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
> > Serdes.String(), messagesSerde)
> > .foreach(new ForeachAction() {
> > public void apply(Windowed key, SortedSet messages)
> {
> > ...
> > }
> > });
> >
> > So basically I rekey the original message into another topic and then
> > aggregate it based on that key.
> > What I have observed is that when I used windowed aggregation the
> > aggregator does not use previous aggregated value.
> >
> > public SortedSet apply(String aggKey, Message value,
> > SortedSet aggregate) {
> > aggregate.add(value);
> > return aggregate;
> > }
> >
> > So in the above function the aggregate is an empty set of every value
> > entering into pipeline. When I remove the windowed aggregation, the
> > aggregate set retains previously aggregated values in the set.
> >
> > I am just not able to wrap my head around it. When I ran this type of
> test
> > locally on windows it is working fine. However a similar pipeline setup
> > when run against production on linux is behaving strangely and always
> > getting an empty aggregate set.
> > Any idea what could be the reason, where should I look at the problem.
> Does
> > length of key string matters here? I will later try to run the same
> simple
> > setup on linux and see what happens. But this is a very strange behavior.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang 
> > wrote:
> >
> > > Hello Sachin,
> > >
> > > In the implementation of SortedSet, if the object's implemented the
> > > Comparable interface, that compareTo function is applied in "
> > > aggregate.add(value);", and hence if it returns 0, this element will
> not
> > be
> > > added since it is a Set.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal 
> > > wrote:
> > >
> > > > Hi,
> > > > What I find is that when I use sorted set as aggregation it fails to
> > > > aggregate the values which have compareTo returning 0.
> > > >
> > > > My class is like this:
> > > > public class Message implements Comparable {
> > > >

Re: Kafka windowed table not aggregating correctly

2016-11-28 Thread Guozhang Wang
Sachin,

This is indeed a bit wired, and we'd like to try to re-produce your issue
locally. Do you have a sample input data for us to try out?

Guozhang

On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal  wrote:

> Hi,
> I fixed that sorted set issue but I am facing a weird problem which I am
> not able to replicate.
>
> Here is the sample problem that I could isolate:
> My class is like this:
> public static class Message implements Comparable {
> public long ts;
> public String message;
> public String key;
> public Message() {};
> public Message(long ts, String message, String key) {
> this.ts = ts;
> this.key = key;
> this.message = message;
> }
> public int compareTo(Message paramT) {
> long ts1 = paramT.ts;
> return ts > ts1 ? 1 : -1;
> }
> }
>
> pipeline is like this:
> builder.stream(Serdes.String(), messageSerde, "test-window-stream")\
>  .map(new KeyValueMapper>() {
>  public KeyValue apply(String key, Message value) {
>  return new KeyValue(value.key, value);
>   }
>  })
> .through(Serdes.String(), messageSerde, "test-window-key-stream")
> .aggregateByKey(new Initializer() {
> public SortedSet apply() {
> return new TreeSet();
> }
> }, new Aggregator() {
> public SortedSet apply(String aggKey, Message value,
> SortedSet aggregate) {
> aggregate.add(value);
> return aggregate;
> }
> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
> Serdes.String(), messagesSerde)
> .foreach(new ForeachAction() {
> public void apply(Windowed key, SortedSet messages) {
> ...
> }
> });
>
> So basically I rekey the original message into another topic and then
> aggregate it based on that key.
> What I have observed is that when I used windowed aggregation the
> aggregator does not use previous aggregated value.
>
> public SortedSet apply(String aggKey, Message value,
> SortedSet aggregate) {
> aggregate.add(value);
> return aggregate;
> }
>
> So in the above function the aggregate is an empty set of every value
> entering into pipeline. When I remove the windowed aggregation, the
> aggregate set retains previously aggregated values in the set.
>
> I am just not able to wrap my head around it. When I ran this type of test
> locally on windows it is working fine. However a similar pipeline setup
> when run against production on linux is behaving strangely and always
> getting an empty aggregate set.
> Any idea what could be the reason, where should I look at the problem. Does
> length of key string matters here? I will later try to run the same simple
> setup on linux and see what happens. But this is a very strange behavior.
>
> Thanks
> Sachin
>
>
>
> On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang 
> wrote:
>
> > Hello Sachin,
> >
> > In the implementation of SortedSet, if the object's implemented the
> > Comparable interface, that compareTo function is applied in "
> > aggregate.add(value);", and hence if it returns 0, this element will not
> be
> > added since it is a Set.
> >
> >
> > Guozhang
> >
> >
> > On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal 
> > wrote:
> >
> > > Hi,
> > > What I find is that when I use sorted set as aggregation it fails to
> > > aggregate the values which have compareTo returning 0.
> > >
> > > My class is like this:
> > > public class Message implements Comparable {
> > > public long ts;
> > > public String message;
> > > public Message() {};
> > > public Message(long ts, String message) {
> > > this.ts = ts;
> > > this.message = message;
> > > }
> > > public int compareTo(Message paramT) {
> > > long ts1 = paramT.ts;
> > > return ts == ts1 ? 0 : ts > ts1 ? 1 : -1;
> > > }
> > > }
> > >
> > > pipeline is like this:
> > > builder.stream(Serdes.String(), messageSerde, "test-window-stream")
> > > .aggregateByKey(new Initializer() {
> > > public SortedSet apply() {
> > > return new TreeSet();
> > > }
> > > }, new Aggregator() {
> > > public SortedSet apply(String aggKey, Message value,
> > > SortedSet aggregate) {
> > > aggregate.add(value);
> > > return aggregate;
> > > }
> > > }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
> > > Serdes.String(), messagesSerde)
> > > .foreach(new ForeachAction() {
> > > public void apply(Windowed key, SortedSet
> messages)
> > {
> > > ...
> > > }
> > > });
> > >
> > > So any message published between 10 and 20 seconds gets aggregated in
> 10
> > -
> > > 20 bucket and I print the size of the 

Re: Kafka windowed table not aggregating correctly

2016-11-25 Thread Sachin Mittal
Hi,
I fixed that sorted set issue but I am facing a weird problem which I am
not able to replicate.

Here is the sample problem that I could isolate:
My class is like this:
public static class Message implements Comparable {
public long ts;
public String message;
public String key;
public Message() {};
public Message(long ts, String message, String key) {
this.ts = ts;
this.key = key;
this.message = message;
}
public int compareTo(Message paramT) {
long ts1 = paramT.ts;
return ts > ts1 ? 1 : -1;
}
}

pipeline is like this:
builder.stream(Serdes.String(), messageSerde, "test-window-stream")\
 .map(new KeyValueMapper>() {
 public KeyValue apply(String key, Message value) {
 return new KeyValue(value.key, value);
  }
 })
.through(Serdes.String(), messageSerde, "test-window-key-stream")
.aggregateByKey(new Initializer() {
public SortedSet apply() {
return new TreeSet();
}
}, new Aggregator() {
public SortedSet apply(String aggKey, Message value,
SortedSet aggregate) {
aggregate.add(value);
return aggregate;
}
}, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
Serdes.String(), messagesSerde)
.foreach(new ForeachAction() {
public void apply(Windowed key, SortedSet messages) {
...
}
});

So basically I rekey the original message into another topic and then
aggregate it based on that key.
What I have observed is that when I used windowed aggregation the
aggregator does not use previous aggregated value.

public SortedSet apply(String aggKey, Message value,
SortedSet aggregate) {
aggregate.add(value);
return aggregate;
}

So in the above function the aggregate is an empty set of every value
entering into pipeline. When I remove the windowed aggregation, the
aggregate set retains previously aggregated values in the set.

I am just not able to wrap my head around it. When I ran this type of test
locally on windows it is working fine. However a similar pipeline setup
when run against production on linux is behaving strangely and always
getting an empty aggregate set.
Any idea what could be the reason, where should I look at the problem. Does
length of key string matters here? I will later try to run the same simple
setup on linux and see what happens. But this is a very strange behavior.

Thanks
Sachin



On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang  wrote:

> Hello Sachin,
>
> In the implementation of SortedSet, if the object's implemented the
> Comparable interface, that compareTo function is applied in "
> aggregate.add(value);", and hence if it returns 0, this element will not be
> added since it is a Set.
>
>
> Guozhang
>
>
> On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal 
> wrote:
>
> > Hi,
> > What I find is that when I use sorted set as aggregation it fails to
> > aggregate the values which have compareTo returning 0.
> >
> > My class is like this:
> > public class Message implements Comparable {
> > public long ts;
> > public String message;
> > public Message() {};
> > public Message(long ts, String message) {
> > this.ts = ts;
> > this.message = message;
> > }
> > public int compareTo(Message paramT) {
> > long ts1 = paramT.ts;
> > return ts == ts1 ? 0 : ts > ts1 ? 1 : -1;
> > }
> > }
> >
> > pipeline is like this:
> > builder.stream(Serdes.String(), messageSerde, "test-window-stream")
> > .aggregateByKey(new Initializer() {
> > public SortedSet apply() {
> > return new TreeSet();
> > }
> > }, new Aggregator() {
> > public SortedSet apply(String aggKey, Message value,
> > SortedSet aggregate) {
> > aggregate.add(value);
> > return aggregate;
> > }
> > }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
> > Serdes.String(), messagesSerde)
> > .foreach(new ForeachAction() {
> > public void apply(Windowed key, SortedSet messages)
> {
> > ...
> > }
> > });
> >
> > So any message published between 10 and 20 seconds gets aggregated in 10
> -
> > 20 bucket and I print the size of the set.
> > However output I get is following:
> >
> > Published: 14
> > Aggregated: 10  20 -> 1
> >
> > Published: 18
> > Aggregated: 10  20 -> 2
> >
> > Published: 11
> > Aggregated: 10  20 -> 3
> >
> > Published: 17
> > Aggregated: 10  20 -> 4
> >
> > Published: 14
> > Aggregated: 10  20 -> 4
> >
> > Published: 15
> > Aggregated: 10  20 -> 5
> >
> > Published: 12
> > Aggregated: key2  10  20 -> 6
> >
> > Published: 12
> > Aggregated: 10  20 -> 6
> >
> > So if you see any message that occurs 

Re: Kafka windowed table not aggregating correctly

2016-11-22 Thread Guozhang Wang
Hello Sachin,

In the implementation of SortedSet, if the object's implemented the
Comparable interface, that compareTo function is applied in "
aggregate.add(value);", and hence if it returns 0, this element will not be
added since it is a Set.


Guozhang


On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal  wrote:

> Hi,
> What I find is that when I use sorted set as aggregation it fails to
> aggregate the values which have compareTo returning 0.
>
> My class is like this:
> public class Message implements Comparable {
> public long ts;
> public String message;
> public Message() {};
> public Message(long ts, String message) {
> this.ts = ts;
> this.message = message;
> }
> public int compareTo(Message paramT) {
> long ts1 = paramT.ts;
> return ts == ts1 ? 0 : ts > ts1 ? 1 : -1;
> }
> }
>
> pipeline is like this:
> builder.stream(Serdes.String(), messageSerde, "test-window-stream")
> .aggregateByKey(new Initializer() {
> public SortedSet apply() {
> return new TreeSet();
> }
> }, new Aggregator() {
> public SortedSet apply(String aggKey, Message value,
> SortedSet aggregate) {
> aggregate.add(value);
> return aggregate;
> }
> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
> Serdes.String(), messagesSerde)
> .foreach(new ForeachAction() {
> public void apply(Windowed key, SortedSet messages) {
> ...
> }
> });
>
> So any message published between 10 and 20 seconds gets aggregated in 10 -
> 20 bucket and I print the size of the set.
> However output I get is following:
>
> Published: 14
> Aggregated: 10  20 -> 1
>
> Published: 18
> Aggregated: 10  20 -> 2
>
> Published: 11
> Aggregated: 10  20 -> 3
>
> Published: 17
> Aggregated: 10  20 -> 4
>
> Published: 14
> Aggregated: 10  20 -> 4
>
> Published: 15
> Aggregated: 10  20 -> 5
>
> Published: 12
> Aggregated: key2  10  20 -> 6
>
> Published: 12
> Aggregated: 10  20 -> 6
>
> So if you see any message that occurs again for same second, where
> compareTo returns 0, it fails to get aggregated in the pipeline.
> Notice ones published at 14 and 12 seconds.
>
> Now I am not sure if problem is with Java ie I should use Comparator
> interface and not Comparable for my Message object. Or the problem is with
> Kafka stream or with serializing and de-serializing the set of messages. If
> I replace Set with List all is working fine.
>
> Anyway any ideas here would be appreciated, meanwhile let me see what is
> the best java practice here.
>
> Thanks
> Sachin
>
>
>
> On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll 
> wrote:
>
> > On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal 
> wrote:
> >
> > > I am using kafka_2.10-0.10.0.1.
> > > Say I am having a window of 60 minutes advanced by 15 minutes.
> > > If the stream app using timestamp extractor puts the message in one or
> > more
> > > bucket(s), it will get aggregated in those buckets.
> > > I assume this statement is correct.
> > >
> >
> > Yes.
> >
> >
> >
> > >
> > > Also say when I restart the streams application then bucket aggregation
> > > will resume from last point of halt.
> > > I hope this is also correct.
> > >
> >
> > Yes.
> >
> >
> >
> > > What I noticed that once a message is placed in one bucket, that bucket
> > was
> > > not getting new messages.
> > >
> >
> > This should not happen...
> >
> >
> > > However when I ran a small test case replicating that, it is working
> > > properly. There maybe some issues in application reset.
> > >
> >
> > ...and apparently it works (as expected) in your small test case.
> >
> > Do you have any further information that you could share with us so we
> can
> > help you better?  What's the difference, for example, between your
> "normal"
> > use case and the small test case you have been referring to?
> >
> >
> > -Michael
> >
>



-- 
-- Guozhang


Re: Kafka windowed table not aggregating correctly

2016-11-21 Thread Sachin Mittal
Hi,
What I find is that when I use sorted set as aggregation it fails to
aggregate the values which have compareTo returning 0.

My class is like this:
public class Message implements Comparable {
public long ts;
public String message;
public Message() {};
public Message(long ts, String message) {
this.ts = ts;
this.message = message;
}
public int compareTo(Message paramT) {
long ts1 = paramT.ts;
return ts == ts1 ? 0 : ts > ts1 ? 1 : -1;
}
}

pipeline is like this:
builder.stream(Serdes.String(), messageSerde, "test-window-stream")
.aggregateByKey(new Initializer() {
public SortedSet apply() {
return new TreeSet();
}
}, new Aggregator() {
public SortedSet apply(String aggKey, Message value,
SortedSet aggregate) {
aggregate.add(value);
return aggregate;
}
}, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
Serdes.String(), messagesSerde)
.foreach(new ForeachAction() {
public void apply(Windowed key, SortedSet messages) {
...
}
});

So any message published between 10 and 20 seconds gets aggregated in 10 -
20 bucket and I print the size of the set.
However output I get is following:

Published: 14
Aggregated: 10  20 -> 1

Published: 18
Aggregated: 10  20 -> 2

Published: 11
Aggregated: 10  20 -> 3

Published: 17
Aggregated: 10  20 -> 4

Published: 14
Aggregated: 10  20 -> 4

Published: 15
Aggregated: 10  20 -> 5

Published: 12
Aggregated: key2  10  20 -> 6

Published: 12
Aggregated: 10  20 -> 6

So if you see any message that occurs again for same second, where
compareTo returns 0, it fails to get aggregated in the pipeline.
Notice ones published at 14 and 12 seconds.

Now I am not sure if problem is with Java ie I should use Comparator
interface and not Comparable for my Message object. Or the problem is with
Kafka stream or with serializing and de-serializing the set of messages. If
I replace Set with List all is working fine.

Anyway any ideas here would be appreciated, meanwhile let me see what is
the best java practice here.

Thanks
Sachin



On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll  wrote:

> On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal  wrote:
>
> > I am using kafka_2.10-0.10.0.1.
> > Say I am having a window of 60 minutes advanced by 15 minutes.
> > If the stream app using timestamp extractor puts the message in one or
> more
> > bucket(s), it will get aggregated in those buckets.
> > I assume this statement is correct.
> >
>
> Yes.
>
>
>
> >
> > Also say when I restart the streams application then bucket aggregation
> > will resume from last point of halt.
> > I hope this is also correct.
> >
>
> Yes.
>
>
>
> > What I noticed that once a message is placed in one bucket, that bucket
> was
> > not getting new messages.
> >
>
> This should not happen...
>
>
> > However when I ran a small test case replicating that, it is working
> > properly. There maybe some issues in application reset.
> >
>
> ...and apparently it works (as expected) in your small test case.
>
> Do you have any further information that you could share with us so we can
> help you better?  What's the difference, for example, between your "normal"
> use case and the small test case you have been referring to?
>
>
> -Michael
>


Re: Kafka windowed table not aggregating correctly

2016-11-21 Thread Michael Noll
On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal  wrote:

> I am using kafka_2.10-0.10.0.1.
> Say I am having a window of 60 minutes advanced by 15 minutes.
> If the stream app using timestamp extractor puts the message in one or more
> bucket(s), it will get aggregated in those buckets.
> I assume this statement is correct.
>

Yes.



>
> Also say when I restart the streams application then bucket aggregation
> will resume from last point of halt.
> I hope this is also correct.
>

Yes.



> What I noticed that once a message is placed in one bucket, that bucket was
> not getting new messages.
>

This should not happen...


> However when I ran a small test case replicating that, it is working
> properly. There maybe some issues in application reset.
>

...and apparently it works (as expected) in your small test case.

Do you have any further information that you could share with us so we can
help you better?  What's the difference, for example, between your "normal"
use case and the small test case you have been referring to?


-Michael


Re: Kafka windowed table not aggregating correctly

2016-11-21 Thread Sachin Mittal
I am using kafka_2.10-0.10.0.1.
Say I am having a window of 60 minutes advanced by 15 minutes.
If the stream app using timestamp extractor puts the message in one or more
bucket(s), it will get aggregated in those buckets.
I assume this statement is correct.

Also say when I restart the streams application then bucket aggregation
will resume from last point of halt.
I hope this is also correct.

What I noticed that once a message is placed in one bucket, that bucket was
not getting new messages.

However when I ran a small test case replicating that, it is working
properly. There maybe some issues in application reset.

Thanks
Sachin



On Fri, Nov 18, 2016 at 11:30 PM, Guozhang Wang  wrote:

> Hello Sachin,
>
> Which version of Kafka are you using for this application?
>
>
> Guozhang
>
>
> On Tue, Nov 15, 2016 at 9:52 AM, Sachin Mittal  wrote:
>
> > Hi,
> > I have a simple pipeline
> > stream.aggregateByKey(new Initializer() {
> > public List apply() {
> > return new List
> > }
> > }, new Aggregator() {
> > public List apply(key, value, list) {
> > list.add(value)
> > return list
> > }
> > }, keysSerde, valuesSerde, "table")
> >
> > So this basically aggregates list of values by some key of a source
> stream.
> > This is working fine.
> >
> > However over time the list will grow very big, so I thought of using
> > windowed table.
> >
> > stream.aggregateByKey(new Initializer() {
> > public List apply() {
> > return new List
> > }
> > }, new Aggregator() {
> > public List apply(key, value, list) {
> > list.add(value)
> > return list
> > }
> > }, TimeWindows.of("table", sizeTS).advanceBy(advanceTS), keysSerde,
> > valuesSerde)
> >
> > It is basically the above code, but what I find is that it aggregates
> only
> > one value for a given windowed key.
> > So size of list is always one.
> >
> > What I understood is that it will put the source values in a time bucket
> > based on their timestamp extractor. When i check the timed window I see
> > that value's timestamp between the bounds of time window.
> >
> > However I have not understood that why it is aggregating only a single
> > value always.
> >
> > So to downstream I always get something like
> >
> > (key, start, end) -> [value1]
> > (key, start, end) -> [value2]
> > and not
> > (key, start, end) -> [value1, value2]
> > note both value1 and value2 are between the start and end bonds.
> >
> > However in first case I get this
> > key -> [value1, value2] which is what I expect.
> >
> > So please let me know if I am missing something in my windowed
> aggregation.
> >
> > Or if there is something else to be done to get the output I want.
> >
> > Thanks
> > Sachin
> >
>
>
>
> --
> -- Guozhang
>


Re: Kafka windowed table not aggregating correctly

2016-11-18 Thread Guozhang Wang
Hello Sachin,

Which version of Kafka are you using for this application?


Guozhang


On Tue, Nov 15, 2016 at 9:52 AM, Sachin Mittal  wrote:

> Hi,
> I have a simple pipeline
> stream.aggregateByKey(new Initializer() {
> public List apply() {
> return new List
> }
> }, new Aggregator() {
> public List apply(key, value, list) {
> list.add(value)
> return list
> }
> }, keysSerde, valuesSerde, "table")
>
> So this basically aggregates list of values by some key of a source stream.
> This is working fine.
>
> However over time the list will grow very big, so I thought of using
> windowed table.
>
> stream.aggregateByKey(new Initializer() {
> public List apply() {
> return new List
> }
> }, new Aggregator() {
> public List apply(key, value, list) {
> list.add(value)
> return list
> }
> }, TimeWindows.of("table", sizeTS).advanceBy(advanceTS), keysSerde,
> valuesSerde)
>
> It is basically the above code, but what I find is that it aggregates only
> one value for a given windowed key.
> So size of list is always one.
>
> What I understood is that it will put the source values in a time bucket
> based on their timestamp extractor. When i check the timed window I see
> that value's timestamp between the bounds of time window.
>
> However I have not understood that why it is aggregating only a single
> value always.
>
> So to downstream I always get something like
>
> (key, start, end) -> [value1]
> (key, start, end) -> [value2]
> and not
> (key, start, end) -> [value1, value2]
> note both value1 and value2 are between the start and end bonds.
>
> However in first case I get this
> key -> [value1, value2] which is what I expect.
>
> So please let me know if I am missing something in my windowed aggregation.
>
> Or if there is something else to be done to get the output I want.
>
> Thanks
> Sachin
>



-- 
-- Guozhang


Kafka windowed table not aggregating correctly

2016-11-15 Thread Sachin Mittal
Hi,
I have a simple pipeline
stream.aggregateByKey(new Initializer() {
public List apply() {
return new List
}
}, new Aggregator() {
public List apply(key, value, list) {
list.add(value)
return list
}
}, keysSerde, valuesSerde, "table")

So this basically aggregates list of values by some key of a source stream.
This is working fine.

However over time the list will grow very big, so I thought of using
windowed table.

stream.aggregateByKey(new Initializer() {
public List apply() {
return new List
}
}, new Aggregator() {
public List apply(key, value, list) {
list.add(value)
return list
}
}, TimeWindows.of("table", sizeTS).advanceBy(advanceTS), keysSerde,
valuesSerde)

It is basically the above code, but what I find is that it aggregates only
one value for a given windowed key.
So size of list is always one.

What I understood is that it will put the source values in a time bucket
based on their timestamp extractor. When i check the timed window I see
that value's timestamp between the bounds of time window.

However I have not understood that why it is aggregating only a single
value always.

So to downstream I always get something like

(key, start, end) -> [value1]
(key, start, end) -> [value2]
and not
(key, start, end) -> [value1, value2]
note both value1 and value2 are between the start and end bonds.

However in first case I get this
key -> [value1, value2] which is what I expect.

So please let me know if I am missing something in my windowed aggregation.

Or if there is something else to be done to get the output I want.

Thanks
Sachin