Just increase the retention time so the window is not dropped and can accept later arriving data.
About your example: retention time specified via until() is a minimum retention time! It can happen, that a window is kept longer. -Matthias On 12/12/16 11:49 PM, Sachin Mittal wrote: > Hi, > Well it does help in case you mentioned, but in the case when on 2017 Dec > 12 12:01 AM if we receive a message stamped 2017 Dec 11 11:59 PM, it will > either drop this message or create a fresh older window and aggregate the > message in that, and then drop the window. > It is not clear which of the case it will do. But here both cases are > wrong, as ideally it should have aggregated that message into previous > aggregation and not start a fresh older aggregation (since on Dec 12 12:00 > AM, we drop older windows and create fresh ones.) > > Could you please explain this case. > > I am trying to reproduce this scenario and have written a small java > program which runs against latest kafka source. Build against trunk git > commit of 01d58ad8e039181ade742cf896a08199e3cb7483 > > Here I am publishing messages with ts > TS, TS + 5, TS + 1, TS + 6, TS + 2, TS + 7, TS + 3, TS + 8, TS + 4, + TS + > 9, TS + 5 ... > I hope you get an idea where TS is generally increasing but a next TS can > have value less than previous one. > > My window is > TimeWindows.of(60 * 1000L).advanceBy(30 * 1000L).until(2 * 60 * 1000L) > ie 1 min rolling by 30 seconds and until 2 minutes when we discard the old > and create new one. > > What I observe is that it always aggregate the result in first bucket it > creates even after until timestamp is elapsed. So kind of confused here. > > See if you can give me some insight into rolling window. Here is the code > attached. > > > Thanks > Sachin > -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- > > import java.io.ByteArrayOutputStream; > import java.util.Date; > import java.util.Map; > import java.util.Properties; > import java.util.SortedSet; > import java.util.TreeSet; > > import org.apache.kafka.clients.consumer.ConsumerConfig; > import org.apache.kafka.clients.consumer.ConsumerRecord; > import org.apache.kafka.clients.producer.KafkaProducer; > import org.apache.kafka.clients.producer.ProducerRecord; > import org.apache.kafka.common.errors.SerializationException; > import org.apache.kafka.common.serialization.Deserializer; > import org.apache.kafka.common.serialization.Serde; > import org.apache.kafka.common.serialization.Serdes; > import org.apache.kafka.common.serialization.Serializer; > import org.apache.kafka.streams.KafkaStreams; > import org.apache.kafka.streams.StreamsConfig; > import org.apache.kafka.streams.kstream.Aggregator; > import org.apache.kafka.streams.kstream.ForeachAction; > import org.apache.kafka.streams.kstream.Initializer; > import org.apache.kafka.streams.kstream.KStreamBuilder; > import org.apache.kafka.streams.kstream.TimeWindows; > import org.apache.kafka.streams.kstream.Windowed; > import org.apache.kafka.streams.processor.TimestampExtractor; > > import com.fasterxml.jackson.core.type.TypeReference; > import com.fasterxml.jackson.databind.ObjectMapper; > > public class TestKafkaWindowStream { > > public static void main(String[] args) { > //start the producer > Producer producerThread = new Producer(); > producerThread.start(); > //aggregate the messages via stream > final Serde<Message> messageSerde = Serdes.serdeFrom(new > MessageSerializer(), new MessageDeserializer()); > final Serde<SortedSet<Message>> messagesSerde = > Serdes.serdeFrom(new Serializer<SortedSet<Message>>() { > private ObjectMapper objectMapper = new ObjectMapper(); > public void close() {} > public void configure(Map<String, ?> paramMap, boolean > paramBoolean) {} > public byte[] serialize(String paramString, SortedSet<Message> > messages) { > if (messages == null) { > return null; > } > try { > ByteArrayOutputStream out = new ByteArrayOutputStream(); > objectMapper.writeValue(out, messages); > return out.toByteArray(); > } catch (Exception e) { > throw new SerializationException("Error serializing > JSON message", e); > } > } > }, new Deserializer<SortedSet<Message>>() { > private ObjectMapper objectMapper = new ObjectMapper(); > public void close() {} > public void configure(Map<String, ?> paramMap, boolean > paramBoolean) {} > public SortedSet<Message> deserialize(String paramString, > byte[] paramArrayOfByte) { > if (paramArrayOfByte == null) { > return null; > } > SortedSet<Message> data = null; > try { > data = objectMapper.readValue(paramArrayOfByte, new > TypeReference<TreeSet<Message>>() {}); > } catch (Exception e) { > throw new SerializationException("Error deserializing > JSON message", e); > } > return data; > } > }); > //build the stream > KStreamBuilder builder = new KStreamBuilder(); > builder.stream(Serdes.String(), messageSerde, "test-window-stream") > .groupByKey() > .aggregate(new Initializer<SortedSet<Message>>() { > public SortedSet<Message> apply() { > return new TreeSet<Message>(); > } > }, new Aggregator<String, Message, SortedSet<Message>>() { > public SortedSet<Message> apply(String aggKey, Message value, > SortedSet<Message> aggregate) { > aggregate.add(value); > return aggregate; > } > }, TimeWindows.of(60 * 1000L).advanceBy(30 * 1000L).until(2 * 60 * > 1000L), messagesSerde, "stream-table") > .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() { > public void apply(Windowed<String> key, SortedSet<Message> > messages) { > if("2".equals(key.key())) { > Date start = new Date(key.window().start()); > Date end = new Date(key.window().end()); > System.out.println("Aggregated: " + > + start.getMinutes() + ":" + start.getSeconds() > + " " + end.getMinutes() + ":" + end.getSeconds() > + " -> " + messages.size()); > } > } > }); > //configure and start the stream > Properties streamsProps = new Properties(); > streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "test"); > streamsProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > streamsProps.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, > "localhost:2181"); > streamsProps.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > "org.apache.kafka.common.serialization.Serdes$StringSerde"); > streamsProps.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, > MessageTimestampExtractor.class); > // streamsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > "earliest"); > KafkaStreams streams = new KafkaStreams(builder, streamsProps); > streams.start(); > } > > public static class MessageSerializer implements Serializer<Message> { > private ObjectMapper objectMapper = new ObjectMapper(); > public void configure(Map paramMap, boolean paramBoolean) {} > public byte[] serialize(String paramString, Message message) { > if (message == null) { > return null; > } > try { > ByteArrayOutputStream out = new ByteArrayOutputStream(); > objectMapper.writeValue(out, message); > return out.toByteArray(); > } catch (Exception e) { > throw new SerializationException("Error serializing JSON > message", e); > } > } > public void close() {} > } > > public static class MessageDeserializer implements > Deserializer<Message> { > private ObjectMapper objectMapper = new ObjectMapper(); > public void configure(Map paramMap, boolean paramBoolean) {} > public Message deserialize(String paramString, byte[] > paramArrayOfByte) { > if (paramArrayOfByte == null) { > return null; > } > Message data = null; > try { > data = objectMapper.readValue(paramArrayOfByte, new > TypeReference<Message>() {}); > } catch (Exception e) { > throw new SerializationException("Error deserializing JSON > message", e); > } > return data; > } > public void close() {} > } > > public static class MessageTimestampExtractor implements > TimestampExtractor { > public long extract(ConsumerRecord<Object, Object> record) { > if (record.value() instanceof Message) { > return ((Message) record.value()).ts; > } else { > return record.timestamp(); > } > } > } > > public static class Message implements Comparable<Message> { > public long ts; > public String message; > public Message() {}; > public Message(long ts, String message) { > this.message = message; > } > public int compareTo(Message paramT) { > long ts1 = paramT.ts; > return ts > ts1 ? 1 : -1; > } > public String toString() { > return "[" + message + "]"; > } > } > } > > class Producer extends Thread { > private KafkaProducer<String, TestKafkaWindowStream.Message> producer; > > public Producer() { > Properties producerProps = new Properties(); > producerProps.put("bootstrap.servers", "localhost:9092"); > producerProps.put("client.id", "DemoProducer"); > producerProps.put("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > producerProps.put("value.serializer", > "TestKafkaWindowStream$MessageSerializer"); > producer = new KafkaProducer<String, > TestKafkaWindowStream.Message>(producerProps); > } > > public void run() { > int nextKey = 2; > int count = 0; > long ts = System.currentTimeMillis(); > while (true) { > Date date = new Date(ts); > String messageStr = nextKey + " " + count; > TestKafkaWindowStream.Message m = new > TestKafkaWindowStream.Message(ts, messageStr); > try { > producer.send(new ProducerRecord<String, > TestKafkaWindowStream.Message>("test-window-stream", ""+nextKey, m)).get(); > if(2 == nextKey) { > System.out.println("Published: " + date.getMinutes() + > " " + date.getSeconds()); > } > Thread.sleep(1000); > } catch (Exception e) { > e.printStackTrace(); > } > count++; > ts = ts + (count % 2 != 0 ? 5 * 1000l : - 4 * 1000l); > } > } > } > > ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- > > > On Tue, Dec 13, 2016 at 4:53 AM, Guozhang Wang <wangg...@gmail.com> wrote: > >> Hi Sachin, >> >> Note that "until" means that the window will be retained for that period of >> time after the window starting time. So when you set the time to 1 year, if >> there is a message whose timestamp is 1 year + 1 sec beyond the "current >> stream time", then yes it will cause the window to be dropped. But in >> practice, if you are confident that you would not likely receive a message >> stamped 2017.Dec.12 (from your use case it seems possible that different >> source's clocks can be shifted by a bit, but not as much as a year right? >> ), then it is still helps with the problem. >> >> >> Guozhang >> >> >> On Fri, Dec 9, 2016 at 8:57 PM, Sachin Mittal <sjmit...@gmail.com> wrote: >> >>> Hi, >>> I think windows retention period does not solves the problem, only delays >>> it. >>> Based on what I understand say I set the time to 1 year using until. >>> Then when I get the message with timestamp 1 year + 1 sec it will delete >>> the old windows and create new ones from that message. >>> Now let us say we get next message with timestamp 1 year - 1 sec, based >> on >>> what you said, it will ignore this message. >>> >>> In my case we get messages from different sources whose clocks are not in >>> sync. So overall message come with increasing timestamp but for a short >>> duration there is no order guarantee. >>> >>> So I think before deleting the older windows it should retain small >> portion >>> of old windows too, so nearby older messages are not dropped. >>> >>> I suggest have something like windows.size.advanceBy.until.retain >>> Retain will retain the periods which fall under retain ms from the upper >>> bound. >>> >>> So window can be defined as >>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 * >>> 1000l).untill(365 * 24 * 3600 * 1000l).retain(900 * 1000l) >>> So when dropping older windows it will retain the ones fall in last 15 >>> minutes. >>> >>> >>> Please let me know in case I missed something on how and if at all older >>> messages are dropped. >>> >>> Thanks >>> Sachin >>> >>> >>> >>> >>> >>> On Sat, Dec 10, 2016 at 5:45 AM, Guozhang Wang <wangg...@gmail.com> >> wrote: >>> >>>> Assuming your windows retention period is the same to the window >> length, >>>> then it is true that ZZ will cause the current window to be dropper. >> And >>>> then when ZZA is recieved, it will not cause the old windows to be >>>> re-created but will be ignored since it is considered as "expired". >>>> >>>> Note that you can set the window retention period much longer than the >>>> window length itself, using the "until" API I mentioned above to handle >>> any >>>> sudden future records. >>>> >>>> >>>> >>>> Guozhang >>>> >>>> On Thu, Dec 8, 2016 at 8:19 PM, Sachin Mittal <sjmit...@gmail.com> >>> wrote: >>>> >>>>> Hi, >>>>> Right now in order to circumvent this problem I am using a timestamp >>>> whose >>>>> values increase by few ms as and when I get new records. >>>>> So lets say I have records in order >>>>> A -> lower limit TS + 1 sec >>>>> B -> lower limit TS + 3 sec >>>>> C -> lower limit TS + 5 sec >>>>> .. >>>>> Z -> upper limit TS - 1 sec >>>>> >>>>> Now say I get a record ZZ with ts upper limit TS + 1 sec I assume it >>> will >>>>> drop the previous windows and create new ones based on this >> timestamp. >>>>> Please confirm this understanding. >>>>> >>>>> Now lets say I get new record ZZA with timestamp (old) upper limit TS >>> - 1 >>>>> sec, will this again cause new windows to be dropped and recreate >> older >>>>> windows fresh with all the older aggregation done so far lost? >>>>> >>>>> Thanks >>>>> Sachin >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, Dec 9, 2016 at 12:16 AM, Guozhang Wang <wangg...@gmail.com> >>>> wrote: >>>>> >>>>>> Hello Sachin, >>>>>> >>>>>> I am with you that ideally the windowing segmentation >> implementation >>>>> should >>>>>> be totally abstracted from users but today it is a bit confusing to >>>>>> understand. I have filed JIRA some time ago to improve on this end: >>>>>> >>>>>> https://issues.apache.org/jira/browse/KAFKA-3596 >>>>>> >>>>>> So to your example, if a "far future record" was received whose >>>> timestamp >>>>>> is beyond current time + the retention period, it could potentially >>>> cause >>>>>> the current window to be dropped. >>>>>> >>>>>> >>>>>> Guozhang >>>>>> >>>>>> >>>>>> On Fri, Dec 2, 2016 at 10:07 PM, Sachin Mittal <sjmit...@gmail.com >>> >>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> I think now it makes all the sense. The field I was using for >>>> timestamp >>>>>>> extractor contains timestamps which spans for greater than a >> day's >>>>>> duration >>>>>>> and it worked for wall clock because for short duration >> timestamps >>>> were >>>>>> in >>>>>>> day's range. >>>>>>> >>>>>>> I wanted to understand one thing: >>>>>>> Say I have a timestamp extractor field and as record gets >> ingested >>>>> future >>>>>>> records will have increasing values for the timestamp. >>>>>>> >>>>>>> Now lets say default duration is one day. At a future time a >> record >>>>> will >>>>>>> have timestamp which now is greater than the initial day's range. >>>>>>> What will happen then, it will create a new segment and then >> create >>>>>> windows >>>>>>> in it for the next day's duration? >>>>>>> What happens if now it gets a record from the previous day, will >> it >>>> get >>>>>>> discarded or will it again have just the single value aggregated >> in >>>> it >>>>>>> (previous values are lost). >>>>>>> So when new segment is create as I understand does it retain the >>>> older >>>>>>> segments data. >>>>>>> >>>>>>> This is bit confusing, so would be helpful if you can explain in >>> bit >>>>> more >>>>>>> detail. >>>>>>> >>>>>>> Thanks >>>>>>> Sachin >>>>>>> >>>>>>> >>>>>>> On Sat, Dec 3, 2016 at 5:18 AM, Guozhang Wang < >> wangg...@gmail.com> >>>>>> wrote: >>>>>>> >>>>>>>> Sachin, >>>>>>>> >>>>>>>> One thing to note is that the retention of the windowed stores >>>> works >>>>> by >>>>>>>> keeping multiple segments of the stores where each segments >>> stores >>>> a >>>>>> time >>>>>>>> range which can potentially span multiple windows, if a new >>> window >>>>>> needs >>>>>>> to >>>>>>>> be created that is further from the oldest segment's time >> range + >>>>>>> retention >>>>>>>> period (from your code it seems you do not override it from >>>>>>>> TimeWindows.of("stream-table", >>>>>>>> 10 * 1000L).advanceBy(5 * 1000L), via until(...)), so the >> default >>>> of >>>>>> one >>>>>>>> day is used. >>>>>>>> >>>>>>>> So with WallclockTimeExtractor since it is using system time, >> it >>>> wont >>>>>>> give >>>>>>>> you timestamps that span for more than a day during a short >>> period >>>> of >>>>>>> time, >>>>>>>> but if your own defined timestamps expand that value, then old >>>>> segments >>>>>>>> will be dropped immediately and hence the aggregate values will >>> be >>>>>>> returned >>>>>>>> as a single value. >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Dec 2, 2016 at 11:58 AM, Matthias J. Sax < >>>>>> matth...@confluent.io> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> The extractor is used in >>>>>>>>> >>>>>>>>> org.apache.kafka.streams.processor.internals. >>>>>>> RecordQueue#addRawRecords() >>>>>>>>> >>>>>>>>> Let us know, if you could resolve the problem or need more >>> help. >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> On 12/2/16 11:46 AM, Sachin Mittal wrote: >>>>>>>>>> https://github.com/SOHU-Co/kafka-node/ this is the node js >>>>> client >>>>>> i >>>>>>> am >>>>>>>>>> using. The version is 0.5x. Can you please tell me what >> code >>> in >>>>>>> streams >>>>>>>>>> calls the timestamp extractor. I can look there to see if >>> there >>>>> is >>>>>>> any >>>>>>>>>> issue. >>>>>>>>>> >>>>>>>>>> Again issue happens only when producing the messages using >>>>> producer >>>>>>>> that >>>>>>>>> is >>>>>>>>>> compatible with kafka version 0.8x. I see that this >> producer >>>> does >>>>>> not >>>>>>>>> send >>>>>>>>>> a record timestamp as this was introduced in version 0.10 >>> only. >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> Sachin >>>>>>>>>> >>>>>>>>>> On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" < >>>>> matth...@confluent.io> >>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> I am not sure what is happening. That's why it would be >> good >>>> to >>>>>>> have a >>>>>>>>>>> toy example to reproduce the issue. >>>>>>>>>>> >>>>>>>>>>> What do you mean by "Kafka node version 0.5"? >>>>>>>>>>> >>>>>>>>>>> -Matthias >>>>>>>>>>> >>>>>>>>>>> On 12/2/16 11:30 AM, Sachin Mittal wrote: >>>>>>>>>>>> I can provide with the data but data does not seem to be >>> the >>>>>> issue. >>>>>>>>>>>> If I submit the same data and use same timestamp >> extractor >>>>> using >>>>>>> the >>>>>>>>>>> java >>>>>>>>>>>> client with kafka version 0.10.0.1 aggregation works >> fine. >>>>>>>>>>>> I find the issue only when submitting the data with kafka >>>> node >>>>>>>> version >>>>>>>>>>> 0.5. >>>>>>>>>>>> It looks like the stream does not extract the time >>> correctly >>>> in >>>>>>> that >>>>>>>>>>> case. >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> Sachin >>>>>>>>>>>> >>>>>>>>>>>> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" < >>>>>> matth...@confluent.io >>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Can you provide example input data (including timetamps) >>> and >>>>>>> result. >>>>>>>>>>>>> What is the expected result (ie, what aggregation do you >>>>> apply)? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -Matthias >>>>>>>>>>>>> >>>>>>>>>>>>> On 12/2/16 7:43 AM, Sachin Mittal wrote: >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> After much debugging I found an issue with timestamp >>>>> extractor. >>>>>>>>>>>>>> >>>>>>>>>>>>>> If I use a custom timestamp extractor with following >>> code: >>>>>>>>>>>>>> public static class MessageTimestampExtractor >>>> implements >>>>>>>>>>>>>> TimestampExtractor { >>>>>>>>>>>>>> public long extract(ConsumerRecord<Object, >>> Object> >>>>>>> record) >>>>>>>> { >>>>>>>>>>>>>> if (record.value() instanceof Message) { >>>>>>>>>>>>>> return ((Message) record.value()).ts; >>>>>>>>>>>>>> } else { >>>>>>>>>>>>>> return record.timestamp(); >>>>>>>>>>>>>> } >>>>>>>>>>>>>> } >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> Here message has a long field ts which stores the >>>> timestamp, >>>>>> the >>>>>>>>>>>>>> aggregation does not work. >>>>>>>>>>>>>> Note I have checked and ts has valid timestamp values. >>>>>>>>>>>>>> >>>>>>>>>>>>>> However if I replace it with say >>>> WallclockTimestampExtractor >>>>>>>>>>> aggregation >>>>>>>>>>>>> is >>>>>>>>>>>>>> working fine. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I do not understand what could be the issue here. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Also note I am using kafka streams version 0.10.0.1 >> and I >>>> am >>>>>>>>> publishing >>>>>>>>>>>>>> messages via >>>>>>>>>>>>>> https://github.com/SOHU-Co/kafka-node/ whose version >> is >>>>> quite >>>>>>> old >>>>>>>>>>> 0.5.x >>>>>>>>>>>>>> >>>>>>>>>>>>>> Let me know if there is some bug in time stamp >>> extractions. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>> Sachin >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang < >>>>>>>> wangg...@gmail.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Sachin, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This is indeed a bit wired, and we'd like to try to >>>>> re-produce >>>>>>>> your >>>>>>>>>>>>> issue >>>>>>>>>>>>>>> locally. Do you have a sample input data for us to try >>>> out? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal < >>>>>>>> sjmit...@gmail.com >>>>>>>>>> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>> I fixed that sorted set issue but I am facing a weird >>>>> problem >>>>>>>>> which I >>>>>>>>>>>>> am >>>>>>>>>>>>>>>> not able to replicate. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Here is the sample problem that I could isolate: >>>>>>>>>>>>>>>> My class is like this: >>>>>>>>>>>>>>>> public static class Message implements >>>>>> Comparable<Message> >>>>>>> { >>>>>>>>>>>>>>>> public long ts; >>>>>>>>>>>>>>>> public String message; >>>>>>>>>>>>>>>> public String key; >>>>>>>>>>>>>>>> public Message() {}; >>>>>>>>>>>>>>>> public Message(long ts, String message, >> String >>>>> key) { >>>>>>>>>>>>>>>> this.ts = ts; >>>>>>>>>>>>>>>> this.key = key; >>>>>>>>>>>>>>>> this.message = message; >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> public int compareTo(Message paramT) { >>>>>>>>>>>>>>>> long ts1 = paramT.ts; >>>>>>>>>>>>>>>> return ts > ts1 ? 1 : -1; >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> pipeline is like this: >>>>>>>>>>>>>>>> builder.stream(Serdes.String(), messageSerde, >>>>>>>>> "test-window-stream")\ >>>>>>>>>>>>>>>> .map(new KeyValueMapper<String, Message, >>>> KeyValue<String, >>>>>>>>>>> Message>>() >>>>>>>>>>>>> { >>>>>>>>>>>>>>>> public KeyValue<String, Message> apply(String >> key, >>>>>> Message >>>>>>>>>>> value) >>>>>>>>>>>>> { >>>>>>>>>>>>>>>> return new KeyValue<String, >>> Message>(value.key, >>>>>>> value); >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> }) >>>>>>>>>>>>>>>> .through(Serdes.String(), messageSerde, >>>>>>> "test-window-key-stream") >>>>>>>>>>>>>>>> .aggregateByKey(new Initializer<SortedSet<Message> >>> () >>> { >>>>>>>>>>>>>>>> public SortedSet<Message> apply() { >>>>>>>>>>>>>>>> return new TreeSet<Message>(); >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> }, new Aggregator<String, Message, >>> SortedSet<Message>>() >>>> { >>>>>>>>>>>>>>>> public SortedSet<Message> apply(String aggKey, >>>> Message >>>>>>> value, >>>>>>>>>>>>>>>> SortedSet<Message> aggregate) { >>>>>>>>>>>>>>>> aggregate.add(value); >>>>>>>>>>>>>>>> return aggregate; >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> }, TimeWindows.of("stream-table", 10 * >>>> 1000L).advanceBy(5 * >>>>>>>> 1000L), >>>>>>>>>>>>>>>> Serdes.String(), messagesSerde) >>>>>>>>>>>>>>>> .foreach(new ForeachAction<Windowed<String>, >>>>>>>>> SortedSet<Message>>() { >>>>>>>>>>>>>>>> public void apply(Windowed<String> key, >>>>>> SortedSet<Message> >>>>>>>>>>>>> messages) >>>>>>>>>>>>>>> { >>>>>>>>>>>>>>>> ... >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> }); >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> So basically I rekey the original message into >> another >>>>> topic >>>>>>> and >>>>>>>>> then >>>>>>>>>>>>>>>> aggregate it based on that key. >>>>>>>>>>>>>>>> What I have observed is that when I used windowed >>>>> aggregation >>>>>>> the >>>>>>>>>>>>>>>> aggregator does not use previous aggregated value. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> public SortedSet<Message> apply(String aggKey, >> Message >>>>> value, >>>>>>>>>>>>>>>> SortedSet<Message> aggregate) { >>>>>>>>>>>>>>>> aggregate.add(value); >>>>>>>>>>>>>>>> return aggregate; >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> So in the above function the aggregate is an empty >> set >>> of >>>>>> every >>>>>>>>> value >>>>>>>>>>>>>>>> entering into pipeline. When I remove the windowed >>>>>> aggregation, >>>>>>>> the >>>>>>>>>>>>>>>> aggregate set retains previously aggregated values in >>> the >>>>>> set. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I am just not able to wrap my head around it. When I >>> ran >>>>> this >>>>>>>> type >>>>>>>>> of >>>>>>>>>>>>>>> test >>>>>>>>>>>>>>>> locally on windows it is working fine. However a >>> similar >>>>>>> pipeline >>>>>>>>>>> setup >>>>>>>>>>>>>>>> when run against production on linux is behaving >>>> strangely >>>>>> and >>>>>>>>> always >>>>>>>>>>>>>>>> getting an empty aggregate set. >>>>>>>>>>>>>>>> Any idea what could be the reason, where should I >> look >>> at >>>>> the >>>>>>>>>>> problem. >>>>>>>>>>>>>>> Does >>>>>>>>>>>>>>>> length of key string matters here? I will later try >> to >>>> run >>>>>> the >>>>>>>> same >>>>>>>>>>>>>>> simple >>>>>>>>>>>>>>>> setup on linux and see what happens. But this is a >> very >>>>>> strange >>>>>>>>>>>>> behavior. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>> Sachin >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang < >>>>>>>>> wangg...@gmail.com> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hello Sachin, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> In the implementation of SortedSet, if the object's >>>>>>> implemented >>>>>>>>> the >>>>>>>>>>>>>>>>> Comparable interface, that compareTo function is >>> applied >>>>> in >>>>>> " >>>>>>>>>>>>>>>>> aggregate.add(value);", and hence if it returns 0, >>> this >>>>>>> element >>>>>>>>> will >>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>> added since it is a Set. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal < >>>>>>>>> sjmit...@gmail.com >>>>>>>>>>>> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>> What I find is that when I use sorted set as >>>> aggregation >>>>> it >>>>>>>> fails >>>>>>>>>>> to >>>>>>>>>>>>>>>>>> aggregate the values which have compareTo returning >>> 0. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> My class is like this: >>>>>>>>>>>>>>>>>> public class Message implements >>>> Comparable<Message> { >>>>>>>>>>>>>>>>>> public long ts; >>>>>>>>>>>>>>>>>> public String message; >>>>>>>>>>>>>>>>>> public Message() {}; >>>>>>>>>>>>>>>>>> public Message(long ts, String message) { >>>>>>>>>>>>>>>>>> this.ts = ts; >>>>>>>>>>>>>>>>>> this.message = message; >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> public int compareTo(Message paramT) { >>>>>>>>>>>>>>>>>> long ts1 = paramT.ts; >>>>>>>>>>>>>>>>>> return ts == ts1 ? 0 : ts > ts1 ? 1 : >> -1; >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> pipeline is like this: >>>>>>>>>>>>>>>>>> builder.stream(Serdes.String(), messageSerde, >>>>>>>>>>> "test-window-stream") >>>>>>>>>>>>>>>>>> .aggregateByKey(new Initializer<SortedSet<Message> >>>> () >>>> { >>>>>>>>>>>>>>>>>> public SortedSet<Message> apply() { >>>>>>>>>>>>>>>>>> return new TreeSet<Message>(); >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> }, new Aggregator<String, Message, >>>> SortedSet<Message>>() >>>>> { >>>>>>>>>>>>>>>>>> public SortedSet<Message> apply(String aggKey, >>>>> Message >>>>>>>> value, >>>>>>>>>>>>>>>>>> SortedSet<Message> aggregate) { >>>>>>>>>>>>>>>>>> aggregate.add(value); >>>>>>>>>>>>>>>>>> return aggregate; >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> }, TimeWindows.of("stream-table", 10 * >>>>> 1000L).advanceBy(5 * >>>>>>>>> 1000L), >>>>>>>>>>>>>>>>>> Serdes.String(), messagesSerde) >>>>>>>>>>>>>>>>>> .foreach(new ForeachAction<Windowed<String>, >>>>>>>>>>> SortedSet<Message>>() { >>>>>>>>>>>>>>>>>> public void apply(Windowed<String> key, >>>>>>> SortedSet<Message> >>>>>>>>>>>>>>>> messages) >>>>>>>>>>>>>>>>> { >>>>>>>>>>>>>>>>>> ... >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> }); >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> So any message published between 10 and 20 seconds >>> gets >>>>>>>>> aggregated >>>>>>>>>>> in >>>>>>>>>>>>>>>> 10 >>>>>>>>>>>>>>>>> - >>>>>>>>>>>>>>>>>> 20 bucket and I print the size of the set. >>>>>>>>>>>>>>>>>> However output I get is following: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Published: 14 >>>>>>>>>>>>>>>>>> Aggregated: 10 20 -> 1 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Published: 18 >>>>>>>>>>>>>>>>>> Aggregated: 10 20 -> 2 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Published: 11 >>>>>>>>>>>>>>>>>> Aggregated: 10 20 -> 3 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Published: 17 >>>>>>>>>>>>>>>>>> Aggregated: 10 20 -> 4 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Published: 14 >>>>>>>>>>>>>>>>>> Aggregated: 10 20 -> 4 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Published: 15 >>>>>>>>>>>>>>>>>> Aggregated: 10 20 -> 5 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Published: 12 >>>>>>>>>>>>>>>>>> Aggregated: key2 10 20 -> 6 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Published: 12 >>>>>>>>>>>>>>>>>> Aggregated: 10 20 -> 6 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> So if you see any message that occurs again for >> same >>>>>> second, >>>>>>>>> where >>>>>>>>>>>>>>>>>> compareTo returns 0, it fails to get aggregated in >>> the >>>>>>>> pipeline. >>>>>>>>>>>>>>>>>> Notice ones published at 14 and 12 seconds. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Now I am not sure if problem is with Java ie I >> should >>>> use >>>>>>>>>>> Comparator >>>>>>>>>>>>>>>>>> interface and not Comparable for my Message object. >>> Or >>>>> the >>>>>>>>> problem >>>>>>>>>>> is >>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>> Kafka stream or with serializing and de-serializing >>> the >>>>> set >>>>>>> of >>>>>>>>>>>>>>>> messages. >>>>>>>>>>>>>>>>> If >>>>>>>>>>>>>>>>>> I replace Set with List all is working fine. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Anyway any ideas here would be appreciated, >> meanwhile >>>> let >>>>>> me >>>>>>>> see >>>>>>>>>>> what >>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>> the best java practice here. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>> Sachin >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll < >>>>>>>>>>> mich...@confluent.io> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal < >>>>>>>>>>> sjmit...@gmail.com >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I am using kafka_2.10-0.10.0.1. >>>>>>>>>>>>>>>>>>>> Say I am having a window of 60 minutes advanced >> by >>> 15 >>>>>>>> minutes. >>>>>>>>>>>>>>>>>>>> If the stream app using timestamp extractor puts >>> the >>>>>>> message >>>>>>>> in >>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>> more >>>>>>>>>>>>>>>>>>>> bucket(s), it will get aggregated in those >> buckets. >>>>>>>>>>>>>>>>>>>> I assume this statement is correct. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Yes. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Also say when I restart the streams application >>> then >>>>>> bucket >>>>>>>>>>>>>>>>> aggregation >>>>>>>>>>>>>>>>>>>> will resume from last point of halt. >>>>>>>>>>>>>>>>>>>> I hope this is also correct. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Yes. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> What I noticed that once a message is placed in >> one >>>>>> bucket, >>>>>>>>> that >>>>>>>>>>>>>>>>> bucket >>>>>>>>>>>>>>>>>>> was >>>>>>>>>>>>>>>>>>>> not getting new messages. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> This should not happen... >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> However when I ran a small test case replicating >>>> that, >>>>> it >>>>>>> is >>>>>>>>>>>>>>>> working >>>>>>>>>>>>>>>>>>>> properly. There maybe some issues in application >>>> reset. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> ...and apparently it works (as expected) in your >>> small >>>>>> test >>>>>>>>> case. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Do you have any further information that you could >>>> share >>>>>>> with >>>>>>>> us >>>>>>>>>>> so >>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>> help you better? What's the difference, for >>> example, >>>>>>> between >>>>>>>>> your >>>>>>>>>>>>>>>>>> "normal" >>>>>>>>>>>>>>>>>>> use case and the small test case you have been >>>> referring >>>>>> to? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> -Michael >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> -- Guozhang >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> -- Guozhang >>>>>> >>>>> >>>> >>>> >>>> >>>> -- >>>> -- Guozhang >>>> >>> >> >> >> >> -- >> -- Guozhang >> >
signature.asc
Description: OpenPGP digital signature