Hello John, That is what exactly the window operator does for you. Can you please check the documentation[1] and let us know what part of the window operator alone does not suffice for the use case?
Sincerely, Ali [1]: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows On Mon, Feb 14, 2022 at 4:03 PM John Smith <java.dev....@gmail.com> wrote: > Because I want to group them for the last X minutes. In this case last 1 > minute. > > On Mon, Feb 14, 2022 at 10:01 AM Ali Bahadir Zeybek <a...@ververica.com> > wrote: > >> Hello John, >> >> Then may I ask you why you need to use a time attribute as part of your >> key? >> Why not just key by the fields like `mydomain.com` and `some-article` in >> your >> example and use only window operator for grouping elements based on time? >> >> Sincerely, >> >> Ali >> >> On Mon, Feb 14, 2022 at 3:55 PM John Smith <java.dev....@gmail.com> >> wrote: >> >>> Hi, thanks. As previously mentioned, processing time. So I >>> regardless when the event was generated I want to count all events I have >>> right now (as soon as they are seen by the flink job). >>> >>> On Mon, Feb 14, 2022 at 4:16 AM Ali Bahadir Zeybek <a...@ververica.com> >>> wrote: >>> >>>> Hello John, >>>> >>>> Currently you are grouping the elements two times based on some time >>>> attribute, one while keying - with event time - and one while windowing >>>> - with >>>> processing time. Therefore, the windowing mechanism produces a new >>>> window >>>> computation when you see an element with the same key but arrived later >>>> from >>>> the previous window start and end timestamps. Can you please clarify >>>> with >>>> which notion of time you would like to handle the stream of data? >>>> >>>> Sincerely, >>>> >>>> Ali >>>> >>>> On Fri, Feb 11, 2022 at 6:43 PM John Smith <java.dev....@gmail.com> >>>> wrote: >>>> >>>>> Ok I used the method suggested by Ali. The error is gone. But now I >>>>> see multiple counts emitted for the same key... >>>>> >>>>> DataStream<MyEvent> slStream = env.fromSource(kafkaSource, >>>>> WatermarkStrategy.noWatermarks(), "Kafka Source") >>>>> .uid(kafkaTopic).name(kafkaTopic) >>>>> .setParallelism(kafkaParallelism) >>>>> .flatMap(new MapToMyEvent("my-event", windowSizeMins, "message")) >>>>> <------ Timestamp in GMT created here rounded to the closest minute down. >>>>> .uid("map-json-logs").name("map-json-logs"); >>>>> >>>>> slStream.keyBy(new MinutesKeySelector()) >>>>> >>>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(windowSizeMins))) >>>>> <---- Tumbling window of 1 minute. >>>>> >>>>> >>>>> >>>>> So below you will see a new count was emitted at 16:51 and 16:55 >>>>> >>>>> {"countId":"2022-02-11T16:50:00Z|mydomain.com >>>>> |/some-article","countDateTime":"2022-02-11T16:50:00Z","domain":" >>>>> mydomain.com","uri":"/some-article","count":3542} >>>>> ----- >>>>> {"countId":"2022-02-11T16:51:00Z|mydomain.com >>>>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":" >>>>> mydomain.com","uri":"/some-article","count":16503} >>>>> {"countId":"2022-02-11T16:51:00Z|mydomain.com >>>>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":" >>>>> mydomain.com","uri":"/some-article","count":70} >>>>> ----- >>>>> >>>>> {"countId":"2022-02-11T16:52:00Z|mydomain.com >>>>> |/some-article","countDateTime":"2022-02-11T16:52:00Z","domain":" >>>>> mydomain.com","uri":"/some-article","count":16037} >>>>> {"countId":"2022-02-11T16:53:00Z|mydomain.com >>>>> |/some-article","countDateTime":"2022-02-11T16:53:00Z","domain":" >>>>> mydomain.com","uri":"/some-article","count":18679} >>>>> {"countId":"2022-02-11T16:54:00Z|mydomain.com >>>>> |/some-article","countDateTime":"2022-02-11T16:54:00Z","domain":" >>>>> mydomain.com","uri":"/some-article","count":17697} >>>>> ----- >>>>> >>>>> {"countId":"2022-02-11T16:55:00Z|mydomain.com >>>>> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":" >>>>> mydomain.com","uri":"/some-article","count":18066} >>>>> {"countId":"2022-02-11T16:55:00Z|mydomain.com >>>>> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":" >>>>> mydomain.com","uri":"/some-article","count":58} >>>>> ----- >>>>> {"countId":"2022-02-11T16:56:00Z|mydomain.com >>>>> |/some-article","countDateTime":"2022-02-11T16:56:00Z","domain":" >>>>> mydomain.com","uri":"/some-article","count":17489} >>>>> >>>>> >>>>> >>>>> >>>>> On Mon, Feb 7, 2022 at 12:44 PM John Smith <java.dev....@gmail.com> >>>>> wrote: >>>>> >>>>>> Ok I think Ali's solution makes the most sense to me. I'll try it and >>>>>> let you know. >>>>>> >>>>>> On Mon, Feb 7, 2022 at 11:44 AM Jing Ge <j...@ververica.com> wrote: >>>>>> >>>>>>> Hi John, >>>>>>> >>>>>>> your getKey() implementation shows that it is not deterministic, >>>>>>> since calling it with the same click instance multiple times will return >>>>>>> different keys. For example a call at 12:01:59.950 and a call at >>>>>>> 12:02:00.050 with the same click instance will return two different >>>>>>> keys: >>>>>>> >>>>>>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name >>>>>>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name >>>>>>> >>>>>>> best regards >>>>>>> Jing >>>>>>> >>>>>>> On Mon, Feb 7, 2022 at 5:07 PM John Smith <java.dev....@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Maybe there's a misunderstanding. But basically I want to >>>>>>>> do clickstream count for a given "url" and for simplicity and accuracy >>>>>>>> of >>>>>>>> the count base it on processing time (event time doesn't matter as >>>>>>>> long as >>>>>>>> I get a total of clicks at that given processing time) >>>>>>>> >>>>>>>> So regardless of the event time. I want all clicks for the current >>>>>>>> processing time rounded to the minute per link. >>>>>>>> >>>>>>>> So, if now was 2022-04-07T12:01:00.000Z >>>>>>>> >>>>>>>> Then I would want the following result... >>>>>>>> >>>>>>>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10 >>>>>>>> 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2 >>>>>>>> 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15 >>>>>>>> .... >>>>>>>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30 >>>>>>>> 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1 >>>>>>>> 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10 >>>>>>>> And so on... >>>>>>>> >>>>>>>> @Override >>>>>>>> public MyEventCountKey getKey(final MyEvent click) throws Exception >>>>>>>> { >>>>>>>> MyEventCountKey key = new MyEventCountKey( >>>>>>>> Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")), >>>>>>>> ChronoField.MINUTE_OF_HOUR, windowSizeMins)).toString(), >>>>>>>> click.getDomain(), // cnn.com >>>>>>>> click.getPath(), // /some-article-name >>>>>>>> ); >>>>>>>> return key; >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Feb 7, 2022 at 10:48 AM David Morávek <d...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> The key selector works. >>>>>>>>> >>>>>>>>> >>>>>>>>> No it does not ;) It depends on the system time so it's not >>>>>>>>> deterministic (you can get different keys for the very same element). >>>>>>>>> >>>>>>>>> How do you key a count based on the time. I have taken this from >>>>>>>>>> samples online. >>>>>>>>>> >>>>>>>>> >>>>>>>>> This is what the windowing is for. You basically want to group / >>>>>>>>> combine elements per key and event time window [1]. >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/ >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> D. >>>>>>>>> >>>>>>>>> On Mon, Feb 7, 2022 at 3:44 PM John Smith <java.dev....@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> The key selector works. It only causes an issue if there too many >>>>>>>>>> keys produced in one shot. For example of 100 "same" keys are >>>>>>>>>> produced for >>>>>>>>>> that 1 minutes it's ok. But if 101 are produced the error happens. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> If you look at the reproducer at least that's what's hapenning >>>>>>>>>> >>>>>>>>>> How do you key a count based on the time. I have taken this from >>>>>>>>>> samples online. >>>>>>>>>> >>>>>>>>>> The key is that particular time for that particular URL path. >>>>>>>>>> >>>>>>>>>> So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00 >>>>>>>>>> >>>>>>>>>> On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, < >>>>>>>>>> ches...@apache.org> wrote: >>>>>>>>>> >>>>>>>>>>> Your Key selector doesn't need to implement hashCode, but given >>>>>>>>>>> the same object it has to return the same key. >>>>>>>>>>> In your reproducer the returned key will have different >>>>>>>>>>> timestamps, and since the timestamp is included in the hashCode, >>>>>>>>>>> they will >>>>>>>>>>> be different each time. >>>>>>>>>>> >>>>>>>>>>> On 07/02/2022 14:50, John Smith wrote: >>>>>>>>>>> >>>>>>>>>>> I don't get it? I provided the reproducer. I implemented the >>>>>>>>>>> interface to Key selector it needs hashcode and equals as well? >>>>>>>>>>> >>>>>>>>>>> I'm attempting to do click stream. So the key is based on >>>>>>>>>>> processing date/time rounded to the minute + domain name + path >>>>>>>>>>> >>>>>>>>>>> So these should be valid below? >>>>>>>>>>> >>>>>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1 >>>>>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1 >>>>>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1 >>>>>>>>>>> >>>>>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article2 >>>>>>>>>>> >>>>>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article1 >>>>>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article1 >>>>>>>>>>> >>>>>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article3 >>>>>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article3 >>>>>>>>>>> >>>>>>>>>>> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, < >>>>>>>>>>> ches...@apache.org> wrote: >>>>>>>>>>> >>>>>>>>>>>> Don't KeySelectors also need to be deterministic? >>>>>>>>>>>> >>>>>>>>>>>> * The {@link KeySelector} allows to use deterministic objects for >>>>>>>>>>>> operations such as reduce,* reduceGroup, join, coGroup, etc. *If >>>>>>>>>>>> invoked multiple times on the same object, the returned key*** >>>>>>>>>>>> must be the same.* >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 04/02/2022 18:25, John Smith wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hi Francesco, here is the reproducer: >>>>>>>>>>>> https://github.com/javadevmtl/flink-key-reproducer >>>>>>>>>>>> >>>>>>>>>>>> So, essentially it looks like when there's a high influx of >>>>>>>>>>>> records produced from the source that the Exception is thrown. >>>>>>>>>>>> >>>>>>>>>>>> The key is generated by 3 values: date/time rounded to the >>>>>>>>>>>> minute and 2 strings. >>>>>>>>>>>> So you will see keys as follows... >>>>>>>>>>>> 2022-02-04T17:20:00Z|foo|bar >>>>>>>>>>>> 2022-02-04T17:21:00Z|foo|bar >>>>>>>>>>>> 2022-02-04T17:22:00Z|foo|bar >>>>>>>>>>>> >>>>>>>>>>>> The reproducer has a custom source that basically produces a >>>>>>>>>>>> record in a loop and sleeps for a specified period of milliseconds >>>>>>>>>>>> 100ms in >>>>>>>>>>>> this case. >>>>>>>>>>>> The lower the sleep delay the faster records are produced the >>>>>>>>>>>> more chances the exception is thrown. With a 100ms delay it's >>>>>>>>>>>> always >>>>>>>>>>>> thrown. Setting a 2000 to 3000ms will guarantee it to work. >>>>>>>>>>>> The original job uses a Kafka Source so it should technically >>>>>>>>>>>> be able to handle even a couple thousand records per second. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, 3 Feb 2022 at 16:41, John Smith <java.dev....@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Ok it's not my data either. I think it may be a volume issue. >>>>>>>>>>>>> I have managed to consistently reproduce the error. I'll upload a >>>>>>>>>>>>> reproducer ASAP. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, 3 Feb 2022 at 15:37, John Smith < >>>>>>>>>>>>> java.dev....@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Ok so I tried to create a reproducer but I couldn't reproduce >>>>>>>>>>>>>> it. But the actual job once in a while throws that error. So I'm >>>>>>>>>>>>>> wondering >>>>>>>>>>>>>> if maybe one of the records that comes in is not valid, though I >>>>>>>>>>>>>> do >>>>>>>>>>>>>> validate prior to getting to the key and window operators. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, 3 Feb 2022 at 14:32, John Smith < >>>>>>>>>>>>>> java.dev....@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Actually maybe not because with PrintSinkFunction it ran for >>>>>>>>>>>>>>> a bit and then it threw the error. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, 3 Feb 2022 at 14:24, John Smith < >>>>>>>>>>>>>>> java.dev....@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Ok it may be the ElasticSearch connector causing the issue? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> If I use PrintSinkFunction then I get no error and my stats >>>>>>>>>>>>>>>> print as expected. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani < >>>>>>>>>>>>>>>> france...@ververica.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>> your hash code and equals seems correct. Can you post a >>>>>>>>>>>>>>>>> minimum stream pipeline reproducer using this class? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> FG >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith < >>>>>>>>>>>>>>>>> java.dev....@gmail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi, getting java.lang.IllegalArgumentException: Key group >>>>>>>>>>>>>>>>>> 39 is not in KeyGroupRange{startKeyGroup=96, >>>>>>>>>>>>>>>>>> endKeyGroup=103}. Unless >>>>>>>>>>>>>>>>>> you're directly using low level state access APIs, this is >>>>>>>>>>>>>>>>>> most likely >>>>>>>>>>>>>>>>>> caused by non-deterministic shuffle key (hashCode and equals >>>>>>>>>>>>>>>>>> implementation). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> This is my class, is my hashCode deterministic? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> public final class MyEventCountKey { >>>>>>>>>>>>>>>>>> private final String countDateTime; private final >>>>>>>>>>>>>>>>>> String domain; private final String event; public >>>>>>>>>>>>>>>>>> MyEventCountKey(final String countDateTime, final String >>>>>>>>>>>>>>>>>> domain, final String event) { >>>>>>>>>>>>>>>>>> this.countDateTime = countDateTime; >>>>>>>>>>>>>>>>>> this.domain = domain; this.event = event; } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> public String getCountDateTime() { >>>>>>>>>>>>>>>>>> return countDateTime; } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> public String getDomain() { >>>>>>>>>>>>>>>>>> return domain; } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> public String getEven() { >>>>>>>>>>>>>>>>>> return event; } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> @Override public String toString() { >>>>>>>>>>>>>>>>>> return countDateTime + "|" + domain + "|" + event; >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> @Override public boolean equals(Object o) { >>>>>>>>>>>>>>>>>> if (this == o) return true; if (o == null || >>>>>>>>>>>>>>>>>> getClass() != o.getClass()) return false; >>>>>>>>>>>>>>>>>> MyEventCountKey that = (MyEventCountKey) o; return >>>>>>>>>>>>>>>>>> countDateTime.equals(that.countDateTime) && >>>>>>>>>>>>>>>>>> domain.equals(that.domain) && >>>>>>>>>>>>>>>>>> event.equals(that.event); } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> @Override public int hashCode() { >>>>>>>>>>>>>>>>>> final int prime = 31; int result = 1; >>>>>>>>>>>>>>>>>> result = prime * result + countDateTime.hashCode(); >>>>>>>>>>>>>>>>>> result = prime * result + domain.hashCode(); result = >>>>>>>>>>>>>>>>>> prime * result + event.hashCode(); return result; >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>