Hello John, Then may I ask you why you need to use a time attribute as part of your key? Why not just key by the fields like `mydomain.com` and `some-article` in your example and use only window operator for grouping elements based on time?
Sincerely, Ali On Mon, Feb 14, 2022 at 3:55 PM John Smith <java.dev....@gmail.com> wrote: > Hi, thanks. As previously mentioned, processing time. So I regardless when > the event was generated I want to count all events I have right now (as > soon as they are seen by the flink job). > > On Mon, Feb 14, 2022 at 4:16 AM Ali Bahadir Zeybek <a...@ververica.com> > wrote: > >> Hello John, >> >> Currently you are grouping the elements two times based on some time >> attribute, one while keying - with event time - and one while windowing - >> with >> processing time. Therefore, the windowing mechanism produces a new window >> computation when you see an element with the same key but arrived later >> from >> the previous window start and end timestamps. Can you please clarify with >> which notion of time you would like to handle the stream of data? >> >> Sincerely, >> >> Ali >> >> On Fri, Feb 11, 2022 at 6:43 PM John Smith <java.dev....@gmail.com> >> wrote: >> >>> Ok I used the method suggested by Ali. The error is gone. But now I see >>> multiple counts emitted for the same key... >>> >>> DataStream<MyEvent> slStream = env.fromSource(kafkaSource, >>> WatermarkStrategy.noWatermarks(), "Kafka Source") >>> .uid(kafkaTopic).name(kafkaTopic) >>> .setParallelism(kafkaParallelism) >>> .flatMap(new MapToMyEvent("my-event", windowSizeMins, "message")) >>> <------ Timestamp in GMT created here rounded to the closest minute down. >>> .uid("map-json-logs").name("map-json-logs"); >>> >>> slStream.keyBy(new MinutesKeySelector()) >>> >>> .window(TumblingProcessingTimeWindows.of(Time.minutes(windowSizeMins))) >>> <---- Tumbling window of 1 minute. >>> >>> >>> >>> So below you will see a new count was emitted at 16:51 and 16:55 >>> >>> {"countId":"2022-02-11T16:50:00Z|mydomain.com >>> |/some-article","countDateTime":"2022-02-11T16:50:00Z","domain":" >>> mydomain.com","uri":"/some-article","count":3542} >>> ----- >>> {"countId":"2022-02-11T16:51:00Z|mydomain.com >>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":" >>> mydomain.com","uri":"/some-article","count":16503} >>> {"countId":"2022-02-11T16:51:00Z|mydomain.com >>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":" >>> mydomain.com","uri":"/some-article","count":70} >>> ----- >>> >>> {"countId":"2022-02-11T16:52:00Z|mydomain.com >>> |/some-article","countDateTime":"2022-02-11T16:52:00Z","domain":" >>> mydomain.com","uri":"/some-article","count":16037} >>> {"countId":"2022-02-11T16:53:00Z|mydomain.com >>> |/some-article","countDateTime":"2022-02-11T16:53:00Z","domain":" >>> mydomain.com","uri":"/some-article","count":18679} >>> {"countId":"2022-02-11T16:54:00Z|mydomain.com >>> |/some-article","countDateTime":"2022-02-11T16:54:00Z","domain":" >>> mydomain.com","uri":"/some-article","count":17697} >>> ----- >>> >>> {"countId":"2022-02-11T16:55:00Z|mydomain.com >>> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":" >>> mydomain.com","uri":"/some-article","count":18066} >>> {"countId":"2022-02-11T16:55:00Z|mydomain.com >>> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":" >>> mydomain.com","uri":"/some-article","count":58} >>> ----- >>> {"countId":"2022-02-11T16:56:00Z|mydomain.com >>> |/some-article","countDateTime":"2022-02-11T16:56:00Z","domain":" >>> mydomain.com","uri":"/some-article","count":17489} >>> >>> >>> >>> >>> On Mon, Feb 7, 2022 at 12:44 PM John Smith <java.dev....@gmail.com> >>> wrote: >>> >>>> Ok I think Ali's solution makes the most sense to me. I'll try it and >>>> let you know. >>>> >>>> On Mon, Feb 7, 2022 at 11:44 AM Jing Ge <j...@ververica.com> wrote: >>>> >>>>> Hi John, >>>>> >>>>> your getKey() implementation shows that it is not deterministic, since >>>>> calling it with the same click instance multiple times will return >>>>> different keys. For example a call at 12:01:59.950 and a call at >>>>> 12:02:00.050 with the same click instance will return two different keys: >>>>> >>>>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name >>>>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name >>>>> >>>>> best regards >>>>> Jing >>>>> >>>>> On Mon, Feb 7, 2022 at 5:07 PM John Smith <java.dev....@gmail.com> >>>>> wrote: >>>>> >>>>>> Maybe there's a misunderstanding. But basically I want to >>>>>> do clickstream count for a given "url" and for simplicity and accuracy of >>>>>> the count base it on processing time (event time doesn't matter as long >>>>>> as >>>>>> I get a total of clicks at that given processing time) >>>>>> >>>>>> So regardless of the event time. I want all clicks for the current >>>>>> processing time rounded to the minute per link. >>>>>> >>>>>> So, if now was 2022-04-07T12:01:00.000Z >>>>>> >>>>>> Then I would want the following result... >>>>>> >>>>>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10 >>>>>> 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2 >>>>>> 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15 >>>>>> .... >>>>>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30 >>>>>> 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1 >>>>>> 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10 >>>>>> And so on... >>>>>> >>>>>> @Override >>>>>> public MyEventCountKey getKey(final MyEvent click) throws Exception >>>>>> { >>>>>> MyEventCountKey key = new MyEventCountKey( >>>>>> Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")), >>>>>> ChronoField.MINUTE_OF_HOUR, windowSizeMins)).toString(), >>>>>> click.getDomain(), // cnn.com >>>>>> click.getPath(), // /some-article-name >>>>>> ); >>>>>> return key; >>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Feb 7, 2022 at 10:48 AM David Morávek <d...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> The key selector works. >>>>>>> >>>>>>> >>>>>>> No it does not ;) It depends on the system time so it's not >>>>>>> deterministic (you can get different keys for the very same element). >>>>>>> >>>>>>> How do you key a count based on the time. I have taken this from >>>>>>>> samples online. >>>>>>>> >>>>>>> >>>>>>> This is what the windowing is for. You basically want to group / >>>>>>> combine elements per key and event time window [1]. >>>>>>> >>>>>>> [1] >>>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/ >>>>>>> >>>>>>> Best, >>>>>>> D. >>>>>>> >>>>>>> On Mon, Feb 7, 2022 at 3:44 PM John Smith <java.dev....@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> The key selector works. It only causes an issue if there too many >>>>>>>> keys produced in one shot. For example of 100 "same" keys are produced >>>>>>>> for >>>>>>>> that 1 minutes it's ok. But if 101 are produced the error happens. >>>>>>>> >>>>>>>> >>>>>>>> If you look at the reproducer at least that's what's hapenning >>>>>>>> >>>>>>>> How do you key a count based on the time. I have taken this from >>>>>>>> samples online. >>>>>>>> >>>>>>>> The key is that particular time for that particular URL path. >>>>>>>> >>>>>>>> So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00 >>>>>>>> >>>>>>>> On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, < >>>>>>>> ches...@apache.org> wrote: >>>>>>>> >>>>>>>>> Your Key selector doesn't need to implement hashCode, but given >>>>>>>>> the same object it has to return the same key. >>>>>>>>> In your reproducer the returned key will have different >>>>>>>>> timestamps, and since the timestamp is included in the hashCode, they >>>>>>>>> will >>>>>>>>> be different each time. >>>>>>>>> >>>>>>>>> On 07/02/2022 14:50, John Smith wrote: >>>>>>>>> >>>>>>>>> I don't get it? I provided the reproducer. I implemented the >>>>>>>>> interface to Key selector it needs hashcode and equals as well? >>>>>>>>> >>>>>>>>> I'm attempting to do click stream. So the key is based on >>>>>>>>> processing date/time rounded to the minute + domain name + path >>>>>>>>> >>>>>>>>> So these should be valid below? >>>>>>>>> >>>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1 >>>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1 >>>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1 >>>>>>>>> >>>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article2 >>>>>>>>> >>>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article1 >>>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article1 >>>>>>>>> >>>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article3 >>>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article3 >>>>>>>>> >>>>>>>>> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, < >>>>>>>>> ches...@apache.org> wrote: >>>>>>>>> >>>>>>>>>> Don't KeySelectors also need to be deterministic? >>>>>>>>>> >>>>>>>>>> * The {@link KeySelector} allows to use deterministic objects for >>>>>>>>>> operations such as reduce,* reduceGroup, join, coGroup, etc. *If >>>>>>>>>> invoked multiple times on the same object, the returned key*** must >>>>>>>>>> be the same.* >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 04/02/2022 18:25, John Smith wrote: >>>>>>>>>> >>>>>>>>>> Hi Francesco, here is the reproducer: >>>>>>>>>> https://github.com/javadevmtl/flink-key-reproducer >>>>>>>>>> >>>>>>>>>> So, essentially it looks like when there's a high influx of >>>>>>>>>> records produced from the source that the Exception is thrown. >>>>>>>>>> >>>>>>>>>> The key is generated by 3 values: date/time rounded to the minute >>>>>>>>>> and 2 strings. >>>>>>>>>> So you will see keys as follows... >>>>>>>>>> 2022-02-04T17:20:00Z|foo|bar >>>>>>>>>> 2022-02-04T17:21:00Z|foo|bar >>>>>>>>>> 2022-02-04T17:22:00Z|foo|bar >>>>>>>>>> >>>>>>>>>> The reproducer has a custom source that basically produces a >>>>>>>>>> record in a loop and sleeps for a specified period of milliseconds >>>>>>>>>> 100ms in >>>>>>>>>> this case. >>>>>>>>>> The lower the sleep delay the faster records are produced the >>>>>>>>>> more chances the exception is thrown. With a 100ms delay it's always >>>>>>>>>> thrown. Setting a 2000 to 3000ms will guarantee it to work. >>>>>>>>>> The original job uses a Kafka Source so it should technically be >>>>>>>>>> able to handle even a couple thousand records per second. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, 3 Feb 2022 at 16:41, John Smith <java.dev....@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Ok it's not my data either. I think it may be a volume issue. I >>>>>>>>>>> have managed to consistently reproduce the error. I'll upload a >>>>>>>>>>> reproducer >>>>>>>>>>> ASAP. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, 3 Feb 2022 at 15:37, John Smith <java.dev....@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Ok so I tried to create a reproducer but I couldn't reproduce >>>>>>>>>>>> it. But the actual job once in a while throws that error. So I'm >>>>>>>>>>>> wondering >>>>>>>>>>>> if maybe one of the records that comes in is not valid, though I do >>>>>>>>>>>> validate prior to getting to the key and window operators. >>>>>>>>>>>> >>>>>>>>>>>> On Thu, 3 Feb 2022 at 14:32, John Smith <java.dev....@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Actually maybe not because with PrintSinkFunction it ran for a >>>>>>>>>>>>> bit and then it threw the error. >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, 3 Feb 2022 at 14:24, John Smith < >>>>>>>>>>>>> java.dev....@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Ok it may be the ElasticSearch connector causing the issue? >>>>>>>>>>>>>> >>>>>>>>>>>>>> If I use PrintSinkFunction then I get no error and my stats >>>>>>>>>>>>>> print as expected. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani < >>>>>>>>>>>>>> france...@ververica.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>> your hash code and equals seems correct. Can you post a >>>>>>>>>>>>>>> minimum stream pipeline reproducer using this class? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> FG >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith < >>>>>>>>>>>>>>> java.dev....@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi, getting java.lang.IllegalArgumentException: Key group >>>>>>>>>>>>>>>> 39 is not in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. >>>>>>>>>>>>>>>> Unless >>>>>>>>>>>>>>>> you're directly using low level state access APIs, this is >>>>>>>>>>>>>>>> most likely >>>>>>>>>>>>>>>> caused by non-deterministic shuffle key (hashCode and equals >>>>>>>>>>>>>>>> implementation). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> This is my class, is my hashCode deterministic? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> public final class MyEventCountKey { >>>>>>>>>>>>>>>> private final String countDateTime; private final >>>>>>>>>>>>>>>> String domain; private final String event; public >>>>>>>>>>>>>>>> MyEventCountKey(final String countDateTime, final String >>>>>>>>>>>>>>>> domain, final String event) { >>>>>>>>>>>>>>>> this.countDateTime = countDateTime; this.domain >>>>>>>>>>>>>>>> = domain; this.event = event; } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> public String getCountDateTime() { >>>>>>>>>>>>>>>> return countDateTime; } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> public String getDomain() { >>>>>>>>>>>>>>>> return domain; } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> public String getEven() { >>>>>>>>>>>>>>>> return event; } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @Override public String toString() { >>>>>>>>>>>>>>>> return countDateTime + "|" + domain + "|" + event; } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @Override public boolean equals(Object o) { >>>>>>>>>>>>>>>> if (this == o) return true; if (o == null || >>>>>>>>>>>>>>>> getClass() != o.getClass()) return false; >>>>>>>>>>>>>>>> MyEventCountKey that = (MyEventCountKey) o; return >>>>>>>>>>>>>>>> countDateTime.equals(that.countDateTime) && >>>>>>>>>>>>>>>> domain.equals(that.domain) && >>>>>>>>>>>>>>>> event.equals(that.event); } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @Override public int hashCode() { >>>>>>>>>>>>>>>> final int prime = 31; int result = 1; >>>>>>>>>>>>>>>> result = prime * result + countDateTime.hashCode(); >>>>>>>>>>>>>>>> result = prime * result + domain.hashCode(); result = >>>>>>>>>>>>>>>> prime * result + event.hashCode(); return result; } >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>