Hi,
Well it does help in case you mentioned, but in the case when on 2017 Dec
12 12:01 AM if we receive a message stamped 2017 Dec 11 11:59 PM, it will
either drop this message or create a fresh older window and aggregate the
message in that, and then drop the window.
It is not clear which of the case it will do. But here both cases are
wrong, as ideally it should have aggregated that message into previous
aggregation and not start a fresh older aggregation (since on Dec 12 12:00
AM, we drop older windows and create fresh ones.)

Could you please explain this case.

I am trying to reproduce this scenario and have written a small java
program which runs against latest kafka source. Build against trunk git
commit of 01d58ad8e039181ade742cf896a08199e3cb7483

Here I am publishing messages with ts
TS, TS + 5,  TS + 1, TS + 6, TS + 2, TS + 7, TS + 3, TS + 8, TS + 4, + TS +
9, TS + 5 ...
I hope you get an idea where TS is generally increasing but a next TS can
have value less than previous one.

My window is
TimeWindows.of(60 * 1000L).advanceBy(30 * 1000L).until(2 * 60 * 1000L)
ie 1 min rolling by 30 seconds and until 2 minutes when we discard the old
and create new one.

What I observe is that it always aggregate the result in first bucket it
creates even after until timestamp is elapsed. So kind of confused here.

See if you can give me some insight into rolling window. Here is the code
attached.


Thanks
Sachin
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

import java.io.ByteArrayOutputStream;
import java.util.Date;
import java.util.Map;
import java.util.Properties;
import java.util.SortedSet;
import java.util.TreeSet;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.TimestampExtractor;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

public class TestKafkaWindowStream {

    public static void main(String[] args) {
        //start the producer
        Producer producerThread = new Producer();
        producerThread.start();
        //aggregate the messages via stream
        final Serde<Message> messageSerde = Serdes.serdeFrom(new
MessageSerializer(), new MessageDeserializer());
        final Serde<SortedSet<Message>> messagesSerde =
Serdes.serdeFrom(new Serializer<SortedSet<Message>>() {
            private ObjectMapper objectMapper = new ObjectMapper();
            public void close() {}
            public void configure(Map<String, ?> paramMap, boolean
paramBoolean) {}
            public byte[] serialize(String paramString, SortedSet<Message>
messages) {
                if (messages == null) {
                    return null;
                }
                try {
                    ByteArrayOutputStream out = new ByteArrayOutputStream();
                    objectMapper.writeValue(out, messages);
                    return out.toByteArray();
                } catch (Exception e) {
                    throw new SerializationException("Error serializing
JSON message", e);
                }
            }
        }, new Deserializer<SortedSet<Message>>() {
            private ObjectMapper objectMapper = new ObjectMapper();
            public void close() {}
            public void configure(Map<String, ?> paramMap, boolean
paramBoolean) {}
            public SortedSet<Message> deserialize(String paramString,
byte[] paramArrayOfByte) {
                if (paramArrayOfByte == null) {
                    return null;
                }
                SortedSet<Message> data = null;
                try {
                    data = objectMapper.readValue(paramArrayOfByte, new
TypeReference<TreeSet<Message>>() {});
                } catch (Exception e) {
                    throw new SerializationException("Error deserializing
JSON message", e);
                }
                return data;
            }
        });
        //build the stream
        KStreamBuilder builder = new KStreamBuilder();
        builder.stream(Serdes.String(), messageSerde, "test-window-stream")
        .groupByKey()
        .aggregate(new Initializer<SortedSet<Message>>() {
            public SortedSet<Message> apply() {
                return new TreeSet<Message>();
            }
        }, new Aggregator<String, Message, SortedSet<Message>>() {
            public SortedSet<Message> apply(String aggKey, Message value,
SortedSet<Message> aggregate) {
                aggregate.add(value);
                return aggregate;
            }
        }, TimeWindows.of(60 * 1000L).advanceBy(30 * 1000L).until(2 * 60 *
1000L), messagesSerde, "stream-table")
        .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() {
            public void apply(Windowed<String> key, SortedSet<Message>
messages) {
                if("2".equals(key.key())) {
                    Date start = new Date(key.window().start());
                    Date end = new Date(key.window().end());
                    System.out.println("Aggregated: " +
                            + start.getMinutes() + ":" + start.getSeconds()
+ " " + end.getMinutes() + ":" + end.getSeconds()
                            + " -> " + messages.size());
                }
            }
        });
        //configure and start the stream
        Properties streamsProps = new Properties();
        streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
        streamsProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
        streamsProps.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
"localhost:2181");
        streamsProps.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
"org.apache.kafka.common.serialization.Serdes$StringSerde");
        streamsProps.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
MessageTimestampExtractor.class);
//        streamsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
        KafkaStreams streams = new KafkaStreams(builder, streamsProps);
        streams.start();
    }

    public static class MessageSerializer implements Serializer<Message> {
        private ObjectMapper objectMapper = new ObjectMapper();
        public void configure(Map paramMap, boolean paramBoolean) {}
        public byte[] serialize(String paramString, Message message) {
            if (message == null) {
                return null;
            }
            try {
                ByteArrayOutputStream out = new ByteArrayOutputStream();
                objectMapper.writeValue(out, message);
                return out.toByteArray();
            } catch (Exception e) {
                throw new SerializationException("Error serializing JSON
message", e);
            }
        }
        public void close() {}
    }

    public static class MessageDeserializer implements
Deserializer<Message> {
        private ObjectMapper objectMapper = new ObjectMapper();
        public void configure(Map paramMap, boolean paramBoolean) {}
        public Message deserialize(String paramString, byte[]
paramArrayOfByte) {
            if (paramArrayOfByte == null) {
                return null;
            }
            Message data = null;
            try {
                data = objectMapper.readValue(paramArrayOfByte, new
TypeReference<Message>() {});
            } catch (Exception e) {
                throw new SerializationException("Error deserializing JSON
message", e);
            }
            return data;
        }
        public void close() {}
    }

    public static class MessageTimestampExtractor implements
TimestampExtractor {
        public long extract(ConsumerRecord<Object, Object> record) {
            if (record.value() instanceof Message) {
                return ((Message) record.value()).ts;
            } else {
                return record.timestamp();
            }
        }
    }

    public static class Message implements Comparable<Message> {
        public long ts;
        public String message;
        public Message() {};
        public Message(long ts, String message) {
            this.message = message;
        }
        public int compareTo(Message paramT) {
            long ts1 = paramT.ts;
            return ts > ts1 ? 1 : -1;
        }
        public String toString() {
            return "[" + message + "]";
        }
    }
}

class Producer extends Thread {
    private KafkaProducer<String, TestKafkaWindowStream.Message> producer;

    public Producer() {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "localhost:9092");
        producerProps.put("client.id", "DemoProducer");
        producerProps.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer",
"TestKafkaWindowStream$MessageSerializer");
        producer = new KafkaProducer<String,
TestKafkaWindowStream.Message>(producerProps);
    }

    public void run() {
        int nextKey = 2;
        int count = 0;
        long ts = System.currentTimeMillis();
        while (true) {
            Date date = new Date(ts);
            String messageStr = nextKey +  " " + count;
            TestKafkaWindowStream.Message m = new
TestKafkaWindowStream.Message(ts, messageStr);
            try {
                producer.send(new ProducerRecord<String,
TestKafkaWindowStream.Message>("test-window-stream", ""+nextKey, m)).get();
                if(2 == nextKey) {
                    System.out.println("Published: " + date.getMinutes() +
" " + date.getSeconds());
                }
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            count++;
            ts = ts + (count % 2 != 0 ? 5 * 1000l :  - 4 * 1000l);
        }
    }
}

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------


On Tue, Dec 13, 2016 at 4:53 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Sachin,
>
> Note that "until" means that the window will be retained for that period of
> time after the window starting time. So when you set the time to 1 year, if
> there is a message whose timestamp is 1 year + 1 sec beyond the "current
> stream time", then yes it will cause the window to be dropped. But in
> practice, if you are confident that you would not likely receive a message
> stamped 2017.Dec.12 (from your use case it seems possible that different
> source's clocks can be shifted by a bit, but not as much as a year right?
> ), then it is still helps with the problem.
>
>
> Guozhang
>
>
> On Fri, Dec 9, 2016 at 8:57 PM, Sachin Mittal <sjmit...@gmail.com> wrote:
>
> > Hi,
> > I think windows retention period does not solves the problem, only delays
> > it.
> > Based on what I understand say I set the time to 1 year using until.
> > Then when I get the message with timestamp 1 year + 1 sec it will delete
> > the old windows and create new ones from that message.
> > Now let us say we get next message with timestamp 1 year - 1 sec, based
> on
> > what you said, it will ignore this message.
> >
> > In my case we get messages from different sources whose clocks are not in
> > sync. So overall message come with increasing timestamp but for a short
> > duration there is no order guarantee.
> >
> > So I think before deleting the older windows it should retain small
> portion
> > of old windows too, so nearby older messages are not dropped.
> >
> > I suggest have something like windows.size.advanceBy.until.retain
> > Retain will retain the periods which fall under retain ms from the upper
> > bound.
> >
> > So window can be defined as
> > TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
> > 1000l).untill(365 * 24 * 3600 * 1000l).retain(900 * 1000l)
> > So when dropping older windows it will retain the ones fall in last 15
> > minutes.
> >
> >
> > Please let me know in case I missed something on how and if at all older
> > messages are dropped.
> >
> > Thanks
> > Sachin
> >
> >
> >
> >
> >
> > On Sat, Dec 10, 2016 at 5:45 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Assuming your windows retention period is the same to the window
> length,
> > > then it is true that ZZ will cause the current window to be dropper.
> And
> > > then when ZZA is recieved, it will not cause the old windows to be
> > > re-created but will be ignored since it is considered as "expired".
> > >
> > > Note that you can set the window retention period much longer than the
> > > window length itself, using the "until" API I mentioned above to handle
> > any
> > > sudden future records.
> > >
> > >
> > >
> > > Guozhang
> > >
> > > On Thu, Dec 8, 2016 at 8:19 PM, Sachin Mittal <sjmit...@gmail.com>
> > wrote:
> > >
> > > > Hi,
> > > > Right now in order to circumvent this problem I am using a timestamp
> > > whose
> > > > values increase by few ms as and when I get new records.
> > > > So lets say I have records in order
> > > > A -> lower limit TS + 1 sec
> > > > B -> lower limit TS + 3 sec
> > > > C -> lower limit TS + 5 sec
> > > > ..
> > > > Z -> upper limit TS - 1 sec
> > > >
> > > > Now say I get a record ZZ with ts upper limit TS + 1 sec I assume it
> > will
> > > > drop the previous windows and create new ones based on this
> timestamp.
> > > > Please confirm this understanding.
> > > >
> > > > Now lets say I get new record ZZA with timestamp (old) upper limit TS
> > - 1
> > > > sec, will this again cause new windows to be dropped and recreate
> older
> > > > windows fresh with all the older aggregation done so far lost?
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > >
> > > >
> > > >
> > > > On Fri, Dec 9, 2016 at 12:16 AM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Hello Sachin,
> > > > >
> > > > > I am with you that ideally the windowing segmentation
> implementation
> > > > should
> > > > > be totally abstracted from users but today it is a bit confusing to
> > > > > understand. I have filed JIRA some time ago to improve on this end:
> > > > >
> > > > > https://issues.apache.org/jira/browse/KAFKA-3596
> > > > >
> > > > > So to your example, if a "far future record" was received whose
> > > timestamp
> > > > > is beyond current time + the retention period, it could potentially
> > > cause
> > > > > the current window to be dropped.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Fri, Dec 2, 2016 at 10:07 PM, Sachin Mittal <sjmit...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > > I think now it makes all the sense. The field I was using for
> > > timestamp
> > > > > > extractor contains timestamps which spans for greater than a
> day's
> > > > > duration
> > > > > > and it worked for wall clock because for short duration
> timestamps
> > > were
> > > > > in
> > > > > > day's range.
> > > > > >
> > > > > > I wanted to understand one thing:
> > > > > > Say I have a timestamp extractor field and as record gets
> ingested
> > > > future
> > > > > > records will have increasing values for the timestamp.
> > > > > >
> > > > > > Now lets say default duration is one day. At a future time a
> record
> > > > will
> > > > > > have timestamp which now is greater than the initial day's range.
> > > > > > What will happen then, it will create a new segment and then
> create
> > > > > windows
> > > > > > in it for the next day's duration?
> > > > > > What happens if now it gets a record from the previous day, will
> it
> > > get
> > > > > > discarded or will it again have just the single value aggregated
> in
> > > it
> > > > > > (previous values are lost).
> > > > > > So when new segment is create as I understand does it retain the
> > > older
> > > > > > segments data.
> > > > > >
> > > > > > This is bit confusing, so would be helpful if you can explain in
> > bit
> > > > more
> > > > > > detail.
> > > > > >
> > > > > > Thanks
> > > > > > Sachin
> > > > > >
> > > > > >
> > > > > > On Sat, Dec 3, 2016 at 5:18 AM, Guozhang Wang <
> wangg...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Sachin,
> > > > > > >
> > > > > > > One thing to note is that the retention of the windowed stores
> > > works
> > > > by
> > > > > > > keeping multiple segments of the stores where each segments
> > stores
> > > a
> > > > > time
> > > > > > > range which can potentially span multiple windows, if a new
> > window
> > > > > needs
> > > > > > to
> > > > > > > be created that is further from the oldest segment's time
> range +
> > > > > > retention
> > > > > > > period (from your code it seems you do not override it from
> > > > > > > TimeWindows.of("stream-table",
> > > > > > > 10 * 1000L).advanceBy(5 * 1000L), via until(...)), so the
> default
> > > of
> > > > > one
> > > > > > > day is used.
> > > > > > >
> > > > > > > So with WallclockTimeExtractor since it is using system time,
> it
> > > wont
> > > > > > give
> > > > > > > you timestamps that span for more than a day during a short
> > period
> > > of
> > > > > > time,
> > > > > > > but if your own defined timestamps expand that value, then old
> > > > segments
> > > > > > > will be dropped immediately and hence the aggregate values will
> > be
> > > > > > returned
> > > > > > > as a single value.
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Dec 2, 2016 at 11:58 AM, Matthias J. Sax <
> > > > > matth...@confluent.io>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > The extractor is used in
> > > > > > > >
> > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > RecordQueue#addRawRecords()
> > > > > > > >
> > > > > > > > Let us know, if you could resolve the problem or need more
> > help.
> > > > > > > >
> > > > > > > > -Matthias
> > > > > > > >
> > > > > > > > On 12/2/16 11:46 AM, Sachin Mittal wrote:
> > > > > > > > > https://github.com/SOHU-Co/kafka-node/ this is the node js
> > > > client
> > > > > i
> > > > > > am
> > > > > > > > > using. The version is 0.5x. Can you please tell me what
> code
> > in
> > > > > > streams
> > > > > > > > > calls the timestamp extractor. I can look there to see if
> > there
> > > > is
> > > > > > any
> > > > > > > > > issue.
> > > > > > > > >
> > > > > > > > > Again issue happens only when producing the messages using
> > > > producer
> > > > > > > that
> > > > > > > > is
> > > > > > > > > compatible with kafka version 0.8x. I see that this
> producer
> > > does
> > > > > not
> > > > > > > > send
> > > > > > > > > a record timestamp as this was introduced in version 0.10
> > only.
> > > > > > > > >
> > > > > > > > > Thanks
> > > > > > > > > Sachin
> > > > > > > > >
> > > > > > > > > On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" <
> > > > matth...@confluent.io>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> I am not sure what is happening. That's why it would be
> good
> > > to
> > > > > > have a
> > > > > > > > >> toy example to reproduce the issue.
> > > > > > > > >>
> > > > > > > > >> What do you mean by "Kafka node version 0.5"?
> > > > > > > > >>
> > > > > > > > >> -Matthias
> > > > > > > > >>
> > > > > > > > >> On 12/2/16 11:30 AM, Sachin Mittal wrote:
> > > > > > > > >>> I can provide with the data but data does not seem to be
> > the
> > > > > issue.
> > > > > > > > >>> If I submit the same data and use same timestamp
> extractor
> > > > using
> > > > > > the
> > > > > > > > >> java
> > > > > > > > >>> client with kafka version 0.10.0.1 aggregation works
> fine.
> > > > > > > > >>> I find the issue only when submitting the data with kafka
> > > node
> > > > > > > version
> > > > > > > > >> 0.5.
> > > > > > > > >>> It looks like the stream does not extract the time
> > correctly
> > > in
> > > > > > that
> > > > > > > > >> case.
> > > > > > > > >>>
> > > > > > > > >>> Thanks
> > > > > > > > >>> Sachin
> > > > > > > > >>>
> > > > > > > > >>> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" <
> > > > > matth...@confluent.io
> > > > > > >
> > > > > > > > >> wrote:
> > > > > > > > >>>
> > > > > > > > >>>> Can you provide example input data (including timetamps)
> > and
> > > > > > result.
> > > > > > > > >>>> What is the expected result (ie, what aggregation do you
> > > > apply)?
> > > > > > > > >>>>
> > > > > > > > >>>>
> > > > > > > > >>>> -Matthias
> > > > > > > > >>>>
> > > > > > > > >>>> On 12/2/16 7:43 AM, Sachin Mittal wrote:
> > > > > > > > >>>>> Hi,
> > > > > > > > >>>>> After much debugging I found an issue with timestamp
> > > > extractor.
> > > > > > > > >>>>>
> > > > > > > > >>>>> If I use a custom timestamp extractor with following
> > code:
> > > > > > > > >>>>>     public static class MessageTimestampExtractor
> > > implements
> > > > > > > > >>>>> TimestampExtractor {
> > > > > > > > >>>>>         public long extract(ConsumerRecord<Object,
> > Object>
> > > > > > record)
> > > > > > > {
> > > > > > > > >>>>>             if (record.value() instanceof Message) {
> > > > > > > > >>>>>                 return ((Message) record.value()).ts;
> > > > > > > > >>>>>             } else {
> > > > > > > > >>>>>                 return record.timestamp();
> > > > > > > > >>>>>             }
> > > > > > > > >>>>>         }
> > > > > > > > >>>>>     }
> > > > > > > > >>>>>
> > > > > > > > >>>>> Here message has a long field ts which stores the
> > > timestamp,
> > > > > the
> > > > > > > > >>>>> aggregation does not work.
> > > > > > > > >>>>> Note I have checked and ts has valid timestamp values.
> > > > > > > > >>>>>
> > > > > > > > >>>>> However if I replace it with say
> > > WallclockTimestampExtractor
> > > > > > > > >> aggregation
> > > > > > > > >>>> is
> > > > > > > > >>>>> working fine.
> > > > > > > > >>>>>
> > > > > > > > >>>>> I do not understand what could be the issue here.
> > > > > > > > >>>>>
> > > > > > > > >>>>> Also note I am using kafka streams version 0.10.0.1
> and I
> > > am
> > > > > > > > publishing
> > > > > > > > >>>>> messages via
> > > > > > > > >>>>> https://github.com/SOHU-Co/kafka-node/ whose version
> is
> > > > quite
> > > > > > old
> > > > > > > > >> 0.5.x
> > > > > > > > >>>>>
> > > > > > > > >>>>> Let me know if there is some bug in time stamp
> > extractions.
> > > > > > > > >>>>>
> > > > > > > > >>>>> Thanks
> > > > > > > > >>>>> Sachin
> > > > > > > > >>>>>
> > > > > > > > >>>>>
> > > > > > > > >>>>>
> > > > > > > > >>>>> On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang <
> > > > > > > wangg...@gmail.com>
> > > > > > > > >>>> wrote:
> > > > > > > > >>>>>
> > > > > > > > >>>>>> Sachin,
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> This is indeed a bit wired, and we'd like to try to
> > > > re-produce
> > > > > > > your
> > > > > > > > >>>> issue
> > > > > > > > >>>>>> locally. Do you have a sample input data for us to try
> > > out?
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> Guozhang
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal <
> > > > > > > sjmit...@gmail.com
> > > > > > > > >
> > > > > > > > >>>>>> wrote:
> > > > > > > > >>>>>>
> > > > > > > > >>>>>>> Hi,
> > > > > > > > >>>>>>> I fixed that sorted set issue but I am facing a weird
> > > > problem
> > > > > > > > which I
> > > > > > > > >>>> am
> > > > > > > > >>>>>>> not able to replicate.
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> Here is the sample problem that I could isolate:
> > > > > > > > >>>>>>> My class is like this:
> > > > > > > > >>>>>>>     public static class Message implements
> > > > > Comparable<Message>
> > > > > > {
> > > > > > > > >>>>>>>         public long ts;
> > > > > > > > >>>>>>>         public String message;
> > > > > > > > >>>>>>>         public String key;
> > > > > > > > >>>>>>>         public Message() {};
> > > > > > > > >>>>>>>         public Message(long ts, String message,
> String
> > > > key) {
> > > > > > > > >>>>>>>             this.ts = ts;
> > > > > > > > >>>>>>>             this.key = key;
> > > > > > > > >>>>>>>             this.message = message;
> > > > > > > > >>>>>>>         }
> > > > > > > > >>>>>>>         public int compareTo(Message paramT) {
> > > > > > > > >>>>>>>             long ts1 = paramT.ts;
> > > > > > > > >>>>>>>             return ts > ts1 ? 1 : -1;
> > > > > > > > >>>>>>>         }
> > > > > > > > >>>>>>>     }
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> pipeline is like this:
> > > > > > > > >>>>>>> builder.stream(Serdes.String(), messageSerde,
> > > > > > > > "test-window-stream")\
> > > > > > > > >>>>>>>  .map(new KeyValueMapper<String, Message,
> > > KeyValue<String,
> > > > > > > > >> Message>>()
> > > > > > > > >>>> {
> > > > > > > > >>>>>>>      public KeyValue<String, Message> apply(String
> key,
> > > > > Message
> > > > > > > > >> value)
> > > > > > > > >>>> {
> > > > > > > > >>>>>>>          return new KeyValue<String,
> > Message>(value.key,
> > > > > > value);
> > > > > > > > >>>>>>>       }
> > > > > > > > >>>>>>>  })
> > > > > > > > >>>>>>> .through(Serdes.String(), messageSerde,
> > > > > > "test-window-key-stream")
> > > > > > > > >>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>
> >()
> > {
> > > > > > > > >>>>>>>     public SortedSet<Message> apply() {
> > > > > > > > >>>>>>>         return new TreeSet<Message>();
> > > > > > > > >>>>>>>     }
> > > > > > > > >>>>>>> }, new Aggregator<String, Message,
> > SortedSet<Message>>()
> > > {
> > > > > > > > >>>>>>>     public SortedSet<Message> apply(String aggKey,
> > > Message
> > > > > > value,
> > > > > > > > >>>>>>> SortedSet<Message> aggregate) {
> > > > > > > > >>>>>>>         aggregate.add(value);
> > > > > > > > >>>>>>>         return aggregate;
> > > > > > > > >>>>>>>     }
> > > > > > > > >>>>>>> }, TimeWindows.of("stream-table", 10 *
> > > 1000L).advanceBy(5 *
> > > > > > > 1000L),
> > > > > > > > >>>>>>> Serdes.String(), messagesSerde)
> > > > > > > > >>>>>>> .foreach(new ForeachAction<Windowed<String>,
> > > > > > > > SortedSet<Message>>() {
> > > > > > > > >>>>>>>     public void apply(Windowed<String> key,
> > > > > SortedSet<Message>
> > > > > > > > >>>> messages)
> > > > > > > > >>>>>> {
> > > > > > > > >>>>>>>         ...
> > > > > > > > >>>>>>>     }
> > > > > > > > >>>>>>> });
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> So basically I rekey the original message into
> another
> > > > topic
> > > > > > and
> > > > > > > > then
> > > > > > > > >>>>>>> aggregate it based on that key.
> > > > > > > > >>>>>>> What I have observed is that when I used windowed
> > > > aggregation
> > > > > > the
> > > > > > > > >>>>>>> aggregator does not use previous aggregated value.
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> public SortedSet<Message> apply(String aggKey,
> Message
> > > > value,
> > > > > > > > >>>>>>> SortedSet<Message> aggregate) {
> > > > > > > > >>>>>>>     aggregate.add(value);
> > > > > > > > >>>>>>>     return aggregate;
> > > > > > > > >>>>>>> }
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> So in the above function the aggregate is an empty
> set
> > of
> > > > > every
> > > > > > > > value
> > > > > > > > >>>>>>> entering into pipeline. When I remove the windowed
> > > > > aggregation,
> > > > > > > the
> > > > > > > > >>>>>>> aggregate set retains previously aggregated values in
> > the
> > > > > set.
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> I am just not able to wrap my head around it. When I
> > ran
> > > > this
> > > > > > > type
> > > > > > > > of
> > > > > > > > >>>>>> test
> > > > > > > > >>>>>>> locally on windows it is working fine. However a
> > similar
> > > > > > pipeline
> > > > > > > > >> setup
> > > > > > > > >>>>>>> when run against production on linux is behaving
> > > strangely
> > > > > and
> > > > > > > > always
> > > > > > > > >>>>>>> getting an empty aggregate set.
> > > > > > > > >>>>>>> Any idea what could be the reason, where should I
> look
> > at
> > > > the
> > > > > > > > >> problem.
> > > > > > > > >>>>>> Does
> > > > > > > > >>>>>>> length of key string matters here? I will later try
> to
> > > run
> > > > > the
> > > > > > > same
> > > > > > > > >>>>>> simple
> > > > > > > > >>>>>>> setup on linux and see what happens. But this is a
> very
> > > > > strange
> > > > > > > > >>>> behavior.
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> Thanks
> > > > > > > > >>>>>>> Sachin
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang <
> > > > > > > > wangg...@gmail.com>
> > > > > > > > >>>>>>> wrote:
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>>> Hello Sachin,
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> In the implementation of SortedSet, if the object's
> > > > > > implemented
> > > > > > > > the
> > > > > > > > >>>>>>>> Comparable interface, that compareTo function is
> > applied
> > > > in
> > > > > "
> > > > > > > > >>>>>>>> aggregate.add(value);", and hence if it returns 0,
> > this
> > > > > > element
> > > > > > > > will
> > > > > > > > >>>>>> not
> > > > > > > > >>>>>>> be
> > > > > > > > >>>>>>>> added since it is a Set.
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> Guozhang
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal <
> > > > > > > > sjmit...@gmail.com
> > > > > > > > >>>
> > > > > > > > >>>>>>>> wrote:
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>> Hi,
> > > > > > > > >>>>>>>>> What I find is that when I use sorted set as
> > > aggregation
> > > > it
> > > > > > > fails
> > > > > > > > >> to
> > > > > > > > >>>>>>>>> aggregate the values which have compareTo returning
> > 0.
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> My class is like this:
> > > > > > > > >>>>>>>>>     public class Message implements
> > > Comparable<Message> {
> > > > > > > > >>>>>>>>>         public long ts;
> > > > > > > > >>>>>>>>>         public String message;
> > > > > > > > >>>>>>>>>         public Message() {};
> > > > > > > > >>>>>>>>>         public Message(long ts, String message) {
> > > > > > > > >>>>>>>>>             this.ts = ts;
> > > > > > > > >>>>>>>>>             this.message = message;
> > > > > > > > >>>>>>>>>         }
> > > > > > > > >>>>>>>>>         public int compareTo(Message paramT) {
> > > > > > > > >>>>>>>>>             long ts1 = paramT.ts;
> > > > > > > > >>>>>>>>>             return ts == ts1 ? 0 : ts > ts1 ? 1 :
> -1;
> > > > > > > > >>>>>>>>>         }
> > > > > > > > >>>>>>>>>     }
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> pipeline is like this:
> > > > > > > > >>>>>>>>> builder.stream(Serdes.String(), messageSerde,
> > > > > > > > >> "test-window-stream")
> > > > > > > > >>>>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>
> > >()
> > > {
> > > > > > > > >>>>>>>>>     public SortedSet<Message> apply() {
> > > > > > > > >>>>>>>>>         return new TreeSet<Message>();
> > > > > > > > >>>>>>>>>     }
> > > > > > > > >>>>>>>>> }, new Aggregator<String, Message,
> > > SortedSet<Message>>()
> > > > {
> > > > > > > > >>>>>>>>>     public SortedSet<Message> apply(String aggKey,
> > > > Message
> > > > > > > value,
> > > > > > > > >>>>>>>>> SortedSet<Message> aggregate) {
> > > > > > > > >>>>>>>>>         aggregate.add(value);
> > > > > > > > >>>>>>>>>         return aggregate;
> > > > > > > > >>>>>>>>>     }
> > > > > > > > >>>>>>>>> }, TimeWindows.of("stream-table", 10 *
> > > > 1000L).advanceBy(5 *
> > > > > > > > 1000L),
> > > > > > > > >>>>>>>>> Serdes.String(), messagesSerde)
> > > > > > > > >>>>>>>>> .foreach(new ForeachAction<Windowed<String>,
> > > > > > > > >> SortedSet<Message>>() {
> > > > > > > > >>>>>>>>>     public void apply(Windowed<String> key,
> > > > > > SortedSet<Message>
> > > > > > > > >>>>>>> messages)
> > > > > > > > >>>>>>>> {
> > > > > > > > >>>>>>>>>         ...
> > > > > > > > >>>>>>>>>     }
> > > > > > > > >>>>>>>>> });
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> So any message published between 10 and 20 seconds
> > gets
> > > > > > > > aggregated
> > > > > > > > >> in
> > > > > > > > >>>>>>> 10
> > > > > > > > >>>>>>>> -
> > > > > > > > >>>>>>>>> 20 bucket and I print the size of the set.
> > > > > > > > >>>>>>>>> However output I get is following:
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> Published: 14
> > > > > > > > >>>>>>>>> Aggregated: 10  20 -> 1
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> Published: 18
> > > > > > > > >>>>>>>>> Aggregated: 10  20 -> 2
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> Published: 11
> > > > > > > > >>>>>>>>> Aggregated: 10  20 -> 3
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> Published: 17
> > > > > > > > >>>>>>>>> Aggregated: 10  20 -> 4
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> Published: 14
> > > > > > > > >>>>>>>>> Aggregated: 10  20 -> 4
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> Published: 15
> > > > > > > > >>>>>>>>> Aggregated: 10  20 -> 5
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> Published: 12
> > > > > > > > >>>>>>>>> Aggregated: key2  10  20 -> 6
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> Published: 12
> > > > > > > > >>>>>>>>> Aggregated: 10  20 -> 6
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> So if you see any message that occurs again for
> same
> > > > > second,
> > > > > > > > where
> > > > > > > > >>>>>>>>> compareTo returns 0, it fails to get aggregated in
> > the
> > > > > > > pipeline.
> > > > > > > > >>>>>>>>> Notice ones published at 14 and 12 seconds.
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> Now I am not sure if problem is with Java ie I
> should
> > > use
> > > > > > > > >> Comparator
> > > > > > > > >>>>>>>>> interface and not Comparable for my Message object.
> > Or
> > > > the
> > > > > > > > problem
> > > > > > > > >> is
> > > > > > > > >>>>>>>> with
> > > > > > > > >>>>>>>>> Kafka stream or with serializing and de-serializing
> > the
> > > > set
> > > > > > of
> > > > > > > > >>>>>>> messages.
> > > > > > > > >>>>>>>> If
> > > > > > > > >>>>>>>>> I replace Set with List all is working fine.
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> Anyway any ideas here would be appreciated,
> meanwhile
> > > let
> > > > > me
> > > > > > > see
> > > > > > > > >> what
> > > > > > > > >>>>>>> is
> > > > > > > > >>>>>>>>> the best java practice here.
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> Thanks
> > > > > > > > >>>>>>>>> Sachin
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll <
> > > > > > > > >> mich...@confluent.io>
> > > > > > > > >>>>>>>>> wrote:
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>> On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal <
> > > > > > > > >> sjmit...@gmail.com
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>>>> wrote:
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>> I am using kafka_2.10-0.10.0.1.
> > > > > > > > >>>>>>>>>>> Say I am having a window of 60 minutes advanced
> by
> > 15
> > > > > > > minutes.
> > > > > > > > >>>>>>>>>>> If the stream app using timestamp extractor puts
> > the
> > > > > > message
> > > > > > > in
> > > > > > > > >>>>>> one
> > > > > > > > >>>>>>>> or
> > > > > > > > >>>>>>>>>> more
> > > > > > > > >>>>>>>>>>> bucket(s), it will get aggregated in those
> buckets.
> > > > > > > > >>>>>>>>>>> I assume this statement is correct.
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> Yes.
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> Also say when I restart the streams application
> > then
> > > > > bucket
> > > > > > > > >>>>>>>> aggregation
> > > > > > > > >>>>>>>>>>> will resume from last point of halt.
> > > > > > > > >>>>>>>>>>> I hope this is also correct.
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> Yes.
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>> What I noticed that once a message is placed in
> one
> > > > > bucket,
> > > > > > > > that
> > > > > > > > >>>>>>>> bucket
> > > > > > > > >>>>>>>>>> was
> > > > > > > > >>>>>>>>>>> not getting new messages.
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> This should not happen...
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>> However when I ran a small test case replicating
> > > that,
> > > > it
> > > > > > is
> > > > > > > > >>>>>>> working
> > > > > > > > >>>>>>>>>>> properly. There maybe some issues in application
> > > reset.
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> ...and apparently it works (as expected) in your
> > small
> > > > > test
> > > > > > > > case.
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> Do you have any further information that you could
> > > share
> > > > > > with
> > > > > > > us
> > > > > > > > >> so
> > > > > > > > >>>>>>> we
> > > > > > > > >>>>>>>>> can
> > > > > > > > >>>>>>>>>> help you better?  What's the difference, for
> > example,
> > > > > > between
> > > > > > > > your
> > > > > > > > >>>>>>>>> "normal"
> > > > > > > > >>>>>>>>>> use case and the small test case you have been
> > > referring
> > > > > to?
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> -Michael
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> --
> > > > > > > > >>>>>>>> -- Guozhang
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> --
> > > > > > > > >>>>>> -- Guozhang
> > > > > > > > >>>>>>
> > > > > > > > >>>>>
> > > > > > > > >>>>
> > > > > > > > >>>>
> > > > > > > > >>>
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to