Hi, I get that but I want to output that key so I can store it in Elastic grouped by the minute.
I had explained with data examples above. But just to be sure.... Lets pretends the current WALL time is 2022-02-14T11:38:01.123Z and I get the bellow clicks event time here (ignored/not read)|cnn.com|/some-article event time here (ignored/not read)|cnn.com|/some-article event time here (ignored/not read)|cnn.com|/another-article event time here (ignored/not read)|cnn.com|/some-article The output should be... 2022-02-14T11:38:00.000Z (this is the wall time rounded to the minute)| cnn.com|some-article count = 3 2022-02-14T11:38:00.000Z( this is the wall time rounded to the minute)| cnn.com|another-article count = 1 On Mon, Feb 14, 2022 at 10:08 AM Ali Bahadir Zeybek <a...@ververica.com> wrote: > Hello John, > > That is what exactly the window operator does for you. Can you please > check the > documentation[1] and let us know what part of the window operator alone > does > not suffice for the use case? > > Sincerely, > > Ali > > [1]: > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows > > On Mon, Feb 14, 2022 at 4:03 PM John Smith <java.dev....@gmail.com> wrote: > >> Because I want to group them for the last X minutes. In this case last 1 >> minute. >> >> On Mon, Feb 14, 2022 at 10:01 AM Ali Bahadir Zeybek <a...@ververica.com> >> wrote: >> >>> Hello John, >>> >>> Then may I ask you why you need to use a time attribute as part of your >>> key? >>> Why not just key by the fields like `mydomain.com` and `some-article` >>> in your >>> example and use only window operator for grouping elements based on time? >>> >>> Sincerely, >>> >>> Ali >>> >>> On Mon, Feb 14, 2022 at 3:55 PM John Smith <java.dev....@gmail.com> >>> wrote: >>> >>>> Hi, thanks. As previously mentioned, processing time. So I >>>> regardless when the event was generated I want to count all events I have >>>> right now (as soon as they are seen by the flink job). >>>> >>>> On Mon, Feb 14, 2022 at 4:16 AM Ali Bahadir Zeybek <a...@ververica.com> >>>> wrote: >>>> >>>>> Hello John, >>>>> >>>>> Currently you are grouping the elements two times based on some time >>>>> attribute, one while keying - with event time - and one while >>>>> windowing - with >>>>> processing time. Therefore, the windowing mechanism produces a new >>>>> window >>>>> computation when you see an element with the same key but arrived >>>>> later from >>>>> the previous window start and end timestamps. Can you please clarify >>>>> with >>>>> which notion of time you would like to handle the stream of data? >>>>> >>>>> Sincerely, >>>>> >>>>> Ali >>>>> >>>>> On Fri, Feb 11, 2022 at 6:43 PM John Smith <java.dev....@gmail.com> >>>>> wrote: >>>>> >>>>>> Ok I used the method suggested by Ali. The error is gone. But now I >>>>>> see multiple counts emitted for the same key... >>>>>> >>>>>> DataStream<MyEvent> slStream = env.fromSource(kafkaSource, >>>>>> WatermarkStrategy.noWatermarks(), "Kafka Source") >>>>>> .uid(kafkaTopic).name(kafkaTopic) >>>>>> .setParallelism(kafkaParallelism) >>>>>> .flatMap(new MapToMyEvent("my-event", windowSizeMins, >>>>>> "message")) <------ Timestamp in GMT created here rounded to the closest >>>>>> minute down. >>>>>> .uid("map-json-logs").name("map-json-logs"); >>>>>> >>>>>> slStream.keyBy(new MinutesKeySelector()) >>>>>> >>>>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(windowSizeMins))) >>>>>> <---- Tumbling window of 1 minute. >>>>>> >>>>>> >>>>>> >>>>>> So below you will see a new count was emitted at 16:51 and 16:55 >>>>>> >>>>>> {"countId":"2022-02-11T16:50:00Z|mydomain.com >>>>>> |/some-article","countDateTime":"2022-02-11T16:50:00Z","domain":" >>>>>> mydomain.com","uri":"/some-article","count":3542} >>>>>> ----- >>>>>> {"countId":"2022-02-11T16:51:00Z|mydomain.com >>>>>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":" >>>>>> mydomain.com","uri":"/some-article","count":16503} >>>>>> {"countId":"2022-02-11T16:51:00Z|mydomain.com >>>>>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":" >>>>>> mydomain.com","uri":"/some-article","count":70} >>>>>> ----- >>>>>> >>>>>> {"countId":"2022-02-11T16:52:00Z|mydomain.com >>>>>> |/some-article","countDateTime":"2022-02-11T16:52:00Z","domain":" >>>>>> mydomain.com","uri":"/some-article","count":16037} >>>>>> {"countId":"2022-02-11T16:53:00Z|mydomain.com >>>>>> |/some-article","countDateTime":"2022-02-11T16:53:00Z","domain":" >>>>>> mydomain.com","uri":"/some-article","count":18679} >>>>>> {"countId":"2022-02-11T16:54:00Z|mydomain.com >>>>>> |/some-article","countDateTime":"2022-02-11T16:54:00Z","domain":" >>>>>> mydomain.com","uri":"/some-article","count":17697} >>>>>> ----- >>>>>> >>>>>> {"countId":"2022-02-11T16:55:00Z|mydomain.com >>>>>> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":" >>>>>> mydomain.com","uri":"/some-article","count":18066} >>>>>> {"countId":"2022-02-11T16:55:00Z|mydomain.com >>>>>> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":" >>>>>> mydomain.com","uri":"/some-article","count":58} >>>>>> ----- >>>>>> {"countId":"2022-02-11T16:56:00Z|mydomain.com >>>>>> |/some-article","countDateTime":"2022-02-11T16:56:00Z","domain":" >>>>>> mydomain.com","uri":"/some-article","count":17489} >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Feb 7, 2022 at 12:44 PM John Smith <java.dev....@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Ok I think Ali's solution makes the most sense to me. I'll try it >>>>>>> and let you know. >>>>>>> >>>>>>> On Mon, Feb 7, 2022 at 11:44 AM Jing Ge <j...@ververica.com> wrote: >>>>>>> >>>>>>>> Hi John, >>>>>>>> >>>>>>>> your getKey() implementation shows that it is not deterministic, >>>>>>>> since calling it with the same click instance multiple times will >>>>>>>> return >>>>>>>> different keys. For example a call at 12:01:59.950 and a call at >>>>>>>> 12:02:00.050 with the same click instance will return two different >>>>>>>> keys: >>>>>>>> >>>>>>>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name >>>>>>>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name >>>>>>>> >>>>>>>> best regards >>>>>>>> Jing >>>>>>>> >>>>>>>> On Mon, Feb 7, 2022 at 5:07 PM John Smith <java.dev....@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Maybe there's a misunderstanding. But basically I want to >>>>>>>>> do clickstream count for a given "url" and for simplicity and >>>>>>>>> accuracy of >>>>>>>>> the count base it on processing time (event time doesn't matter as >>>>>>>>> long as >>>>>>>>> I get a total of clicks at that given processing time) >>>>>>>>> >>>>>>>>> So regardless of the event time. I want all clicks for the current >>>>>>>>> processing time rounded to the minute per link. >>>>>>>>> >>>>>>>>> So, if now was 2022-04-07T12:01:00.000Z >>>>>>>>> >>>>>>>>> Then I would want the following result... >>>>>>>>> >>>>>>>>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10 >>>>>>>>> 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2 >>>>>>>>> 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15 >>>>>>>>> .... >>>>>>>>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30 >>>>>>>>> 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1 >>>>>>>>> 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10 >>>>>>>>> And so on... >>>>>>>>> >>>>>>>>> @Override >>>>>>>>> public MyEventCountKey getKey(final MyEvent click) throws >>>>>>>>> Exception >>>>>>>>> { >>>>>>>>> MyEventCountKey key = new MyEventCountKey( >>>>>>>>> Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")), >>>>>>>>> ChronoField.MINUTE_OF_HOUR, windowSizeMins)).toString(), >>>>>>>>> click.getDomain(), // cnn.com >>>>>>>>> click.getPath(), // /some-article-name >>>>>>>>> ); >>>>>>>>> return key; >>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Feb 7, 2022 at 10:48 AM David Morávek <d...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> The key selector works. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> No it does not ;) It depends on the system time so it's not >>>>>>>>>> deterministic (you can get different keys for the very same element). >>>>>>>>>> >>>>>>>>>> How do you key a count based on the time. I have taken this from >>>>>>>>>>> samples online. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> This is what the windowing is for. You basically want to group / >>>>>>>>>> combine elements per key and event time window [1]. >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/ >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> D. >>>>>>>>>> >>>>>>>>>> On Mon, Feb 7, 2022 at 3:44 PM John Smith <java.dev....@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> The key selector works. It only causes an issue if there too >>>>>>>>>>> many keys produced in one shot. For example of 100 "same" keys are >>>>>>>>>>> produced >>>>>>>>>>> for that 1 minutes it's ok. But if 101 are produced the error >>>>>>>>>>> happens. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> If you look at the reproducer at least that's what's hapenning >>>>>>>>>>> >>>>>>>>>>> How do you key a count based on the time. I have taken this from >>>>>>>>>>> samples online. >>>>>>>>>>> >>>>>>>>>>> The key is that particular time for that particular URL path. >>>>>>>>>>> >>>>>>>>>>> So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00 >>>>>>>>>>> >>>>>>>>>>> On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, < >>>>>>>>>>> ches...@apache.org> wrote: >>>>>>>>>>> >>>>>>>>>>>> Your Key selector doesn't need to implement hashCode, but given >>>>>>>>>>>> the same object it has to return the same key. >>>>>>>>>>>> In your reproducer the returned key will have different >>>>>>>>>>>> timestamps, and since the timestamp is included in the hashCode, >>>>>>>>>>>> they will >>>>>>>>>>>> be different each time. >>>>>>>>>>>> >>>>>>>>>>>> On 07/02/2022 14:50, John Smith wrote: >>>>>>>>>>>> >>>>>>>>>>>> I don't get it? I provided the reproducer. I implemented the >>>>>>>>>>>> interface to Key selector it needs hashcode and equals as well? >>>>>>>>>>>> >>>>>>>>>>>> I'm attempting to do click stream. So the key is based on >>>>>>>>>>>> processing date/time rounded to the minute + domain name + path >>>>>>>>>>>> >>>>>>>>>>>> So these should be valid below? >>>>>>>>>>>> >>>>>>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1 >>>>>>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1 >>>>>>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1 >>>>>>>>>>>> >>>>>>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article2 >>>>>>>>>>>> >>>>>>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article1 >>>>>>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article1 >>>>>>>>>>>> >>>>>>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article3 >>>>>>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article3 >>>>>>>>>>>> >>>>>>>>>>>> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, < >>>>>>>>>>>> ches...@apache.org> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Don't KeySelectors also need to be deterministic? >>>>>>>>>>>>> >>>>>>>>>>>>> * The {@link KeySelector} allows to use deterministic objects for >>>>>>>>>>>>> operations such as reduce,* reduceGroup, join, coGroup, etc. *If >>>>>>>>>>>>> invoked multiple times on the same object, the returned key*** >>>>>>>>>>>>> must be the same.* >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On 04/02/2022 18:25, John Smith wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi Francesco, here is the reproducer: >>>>>>>>>>>>> https://github.com/javadevmtl/flink-key-reproducer >>>>>>>>>>>>> >>>>>>>>>>>>> So, essentially it looks like when there's a high influx of >>>>>>>>>>>>> records produced from the source that the Exception is thrown. >>>>>>>>>>>>> >>>>>>>>>>>>> The key is generated by 3 values: date/time rounded to the >>>>>>>>>>>>> minute and 2 strings. >>>>>>>>>>>>> So you will see keys as follows... >>>>>>>>>>>>> 2022-02-04T17:20:00Z|foo|bar >>>>>>>>>>>>> 2022-02-04T17:21:00Z|foo|bar >>>>>>>>>>>>> 2022-02-04T17:22:00Z|foo|bar >>>>>>>>>>>>> >>>>>>>>>>>>> The reproducer has a custom source that basically produces a >>>>>>>>>>>>> record in a loop and sleeps for a specified period of >>>>>>>>>>>>> milliseconds 100ms in >>>>>>>>>>>>> this case. >>>>>>>>>>>>> The lower the sleep delay the faster records are produced the >>>>>>>>>>>>> more chances the exception is thrown. With a 100ms delay it's >>>>>>>>>>>>> always >>>>>>>>>>>>> thrown. Setting a 2000 to 3000ms will guarantee it to work. >>>>>>>>>>>>> The original job uses a Kafka Source so it should technically >>>>>>>>>>>>> be able to handle even a couple thousand records per second. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, 3 Feb 2022 at 16:41, John Smith < >>>>>>>>>>>>> java.dev....@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Ok it's not my data either. I think it may be a volume issue. >>>>>>>>>>>>>> I have managed to consistently reproduce the error. I'll upload a >>>>>>>>>>>>>> reproducer ASAP. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, 3 Feb 2022 at 15:37, John Smith < >>>>>>>>>>>>>> java.dev....@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Ok so I tried to create a reproducer but I couldn't >>>>>>>>>>>>>>> reproduce it. But the actual job once in a while throws that >>>>>>>>>>>>>>> error. So I'm >>>>>>>>>>>>>>> wondering if maybe one of the records that comes in is not >>>>>>>>>>>>>>> valid, though I >>>>>>>>>>>>>>> do validate prior to getting to the key and window operators. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, 3 Feb 2022 at 14:32, John Smith < >>>>>>>>>>>>>>> java.dev....@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Actually maybe not because with PrintSinkFunction it ran >>>>>>>>>>>>>>>> for a bit and then it threw the error. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thu, 3 Feb 2022 at 14:24, John Smith < >>>>>>>>>>>>>>>> java.dev....@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Ok it may be the ElasticSearch connector causing the issue? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> If I use PrintSinkFunction then I get no error and my >>>>>>>>>>>>>>>>> stats print as expected. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani < >>>>>>>>>>>>>>>>> france...@ververica.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>> your hash code and equals seems correct. Can you post a >>>>>>>>>>>>>>>>>> minimum stream pipeline reproducer using this class? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> FG >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith < >>>>>>>>>>>>>>>>>> java.dev....@gmail.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi, getting java.lang.IllegalArgumentException: Key >>>>>>>>>>>>>>>>>>> group 39 is not in KeyGroupRange{startKeyGroup=96, >>>>>>>>>>>>>>>>>>> endKeyGroup=103}. Unless >>>>>>>>>>>>>>>>>>> you're directly using low level state access APIs, this is >>>>>>>>>>>>>>>>>>> most likely >>>>>>>>>>>>>>>>>>> caused by non-deterministic shuffle key (hashCode and equals >>>>>>>>>>>>>>>>>>> implementation). >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> This is my class, is my hashCode deterministic? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> public final class MyEventCountKey { >>>>>>>>>>>>>>>>>>> private final String countDateTime; private final >>>>>>>>>>>>>>>>>>> String domain; private final String event; public >>>>>>>>>>>>>>>>>>> MyEventCountKey(final String countDateTime, final String >>>>>>>>>>>>>>>>>>> domain, final String event) { >>>>>>>>>>>>>>>>>>> this.countDateTime = countDateTime; >>>>>>>>>>>>>>>>>>> this.domain = domain; this.event = event; } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> public String getCountDateTime() { >>>>>>>>>>>>>>>>>>> return countDateTime; } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> public String getDomain() { >>>>>>>>>>>>>>>>>>> return domain; } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> public String getEven() { >>>>>>>>>>>>>>>>>>> return event; } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> @Override public String toString() { >>>>>>>>>>>>>>>>>>> return countDateTime + "|" + domain + "|" + event; >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> @Override public boolean equals(Object o) { >>>>>>>>>>>>>>>>>>> if (this == o) return true; if (o == null || >>>>>>>>>>>>>>>>>>> getClass() != o.getClass()) return false; >>>>>>>>>>>>>>>>>>> MyEventCountKey that = (MyEventCountKey) o; return >>>>>>>>>>>>>>>>>>> countDateTime.equals(that.countDateTime) && >>>>>>>>>>>>>>>>>>> domain.equals(that.domain) && >>>>>>>>>>>>>>>>>>> event.equals(that.event); } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> @Override public int hashCode() { >>>>>>>>>>>>>>>>>>> final int prime = 31; int result = 1; >>>>>>>>>>>>>>>>>>> result = prime * result + countDateTime.hashCode(); >>>>>>>>>>>>>>>>>>> result = prime * result + domain.hashCode(); result >>>>>>>>>>>>>>>>>>> = prime * result + event.hashCode(); return result; >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>