Ok I think Ali's solution makes the most sense to me. I'll try it and let you know.
On Mon, Feb 7, 2022 at 11:44 AM Jing Ge <j...@ververica.com> wrote: > Hi John, > > your getKey() implementation shows that it is not deterministic, since > calling it with the same click instance multiple times will return > different keys. For example a call at 12:01:59.950 and a call at > 12:02:00.050 with the same click instance will return two different keys: > > 2022-04-07T12:01:00.000Z|cnn.com|some-article-name > 2022-04-07T12:02:00.000Z|cnn.com|some-article-name > > best regards > Jing > > On Mon, Feb 7, 2022 at 5:07 PM John Smith <java.dev....@gmail.com> wrote: > >> Maybe there's a misunderstanding. But basically I want to do clickstream >> count for a given "url" and for simplicity and accuracy of the count base >> it on processing time (event time doesn't matter as long as I get a total >> of clicks at that given processing time) >> >> So regardless of the event time. I want all clicks for the current >> processing time rounded to the minute per link. >> >> So, if now was 2022-04-07T12:01:00.000Z >> >> Then I would want the following result... >> >> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10 >> 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2 >> 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15 >> .... >> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30 >> 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1 >> 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10 >> And so on... >> >> @Override >> public MyEventCountKey getKey(final MyEvent click) throws Exception >> { >> MyEventCountKey key = new MyEventCountKey( >> Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")), >> ChronoField.MINUTE_OF_HOUR, windowSizeMins)).toString(), >> click.getDomain(), // cnn.com >> click.getPath(), // /some-article-name >> ); >> return key; >> } >> >> >> >> On Mon, Feb 7, 2022 at 10:48 AM David Morávek <d...@apache.org> wrote: >> >>> The key selector works. >>> >>> >>> No it does not ;) It depends on the system time so it's not >>> deterministic (you can get different keys for the very same element). >>> >>> How do you key a count based on the time. I have taken this from samples >>>> online. >>>> >>> >>> This is what the windowing is for. You basically want to group / combine >>> elements per key and event time window [1]. >>> >>> [1] >>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/ >>> >>> Best, >>> D. >>> >>> On Mon, Feb 7, 2022 at 3:44 PM John Smith <java.dev....@gmail.com> >>> wrote: >>> >>>> The key selector works. It only causes an issue if there too many keys >>>> produced in one shot. For example of 100 "same" keys are produced for that >>>> 1 minutes it's ok. But if 101 are produced the error happens. >>>> >>>> >>>> If you look at the reproducer at least that's what's hapenning >>>> >>>> How do you key a count based on the time. I have taken this from >>>> samples online. >>>> >>>> The key is that particular time for that particular URL path. >>>> >>>> So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00 >>>> >>>> On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, <ches...@apache.org> >>>> wrote: >>>> >>>>> Your Key selector doesn't need to implement hashCode, but given the >>>>> same object it has to return the same key. >>>>> In your reproducer the returned key will have different timestamps, >>>>> and since the timestamp is included in the hashCode, they will be >>>>> different >>>>> each time. >>>>> >>>>> On 07/02/2022 14:50, John Smith wrote: >>>>> >>>>> I don't get it? I provided the reproducer. I implemented the interface >>>>> to Key selector it needs hashcode and equals as well? >>>>> >>>>> I'm attempting to do click stream. So the key is based on processing >>>>> date/time rounded to the minute + domain name + path >>>>> >>>>> So these should be valid below? >>>>> >>>>> 2022-01-01T10:02:00 + cnn.com + /article1 >>>>> 2022-01-01T10:02:00 + cnn.com + /article1 >>>>> 2022-01-01T10:02:00 + cnn.com + /article1 >>>>> >>>>> 2022-01-01T10:02:00 + cnn.com + /article2 >>>>> >>>>> 2022-01-01T10:03:00 + cnn.com + /article1 >>>>> 2022-01-01T10:03:00 + cnn.com + /article1 >>>>> >>>>> 2022-01-01T10:03:00 + cnn.com + /article3 >>>>> 2022-01-01T10:03:00 + cnn.com + /article3 >>>>> >>>>> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, <ches...@apache.org> >>>>> wrote: >>>>> >>>>>> Don't KeySelectors also need to be deterministic? >>>>>> >>>>>> * The {@link KeySelector} allows to use deterministic objects for >>>>>> operations such as reduce,* reduceGroup, join, coGroup, etc. *If invoked >>>>>> multiple times on the same object, the returned key*** must be the same.* >>>>>> >>>>>> >>>>>> On 04/02/2022 18:25, John Smith wrote: >>>>>> >>>>>> Hi Francesco, here is the reproducer: >>>>>> https://github.com/javadevmtl/flink-key-reproducer >>>>>> >>>>>> So, essentially it looks like when there's a high influx of records >>>>>> produced from the source that the Exception is thrown. >>>>>> >>>>>> The key is generated by 3 values: date/time rounded to the minute and >>>>>> 2 strings. >>>>>> So you will see keys as follows... >>>>>> 2022-02-04T17:20:00Z|foo|bar >>>>>> 2022-02-04T17:21:00Z|foo|bar >>>>>> 2022-02-04T17:22:00Z|foo|bar >>>>>> >>>>>> The reproducer has a custom source that basically produces a >>>>>> record in a loop and sleeps for a specified period of milliseconds 100ms >>>>>> in >>>>>> this case. >>>>>> The lower the sleep delay the faster records are produced the more >>>>>> chances the exception is thrown. With a 100ms delay it's always thrown. >>>>>> Setting a 2000 to 3000ms will guarantee it to work. >>>>>> The original job uses a Kafka Source so it should technically be able >>>>>> to handle even a couple thousand records per second. >>>>>> >>>>>> >>>>>> On Thu, 3 Feb 2022 at 16:41, John Smith <java.dev....@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Ok it's not my data either. I think it may be a volume issue. I have >>>>>>> managed to consistently reproduce the error. I'll upload a reproducer >>>>>>> ASAP. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, 3 Feb 2022 at 15:37, John Smith <java.dev....@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Ok so I tried to create a reproducer but I couldn't reproduce it. >>>>>>>> But the actual job once in a while throws that error. So I'm wondering >>>>>>>> if >>>>>>>> maybe one of the records that comes in is not valid, though I do >>>>>>>> validate >>>>>>>> prior to getting to the key and window operators. >>>>>>>> >>>>>>>> On Thu, 3 Feb 2022 at 14:32, John Smith <java.dev....@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Actually maybe not because with PrintSinkFunction it ran for a bit >>>>>>>>> and then it threw the error. >>>>>>>>> >>>>>>>>> On Thu, 3 Feb 2022 at 14:24, John Smith <java.dev....@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Ok it may be the ElasticSearch connector causing the issue? >>>>>>>>>> >>>>>>>>>> If I use PrintSinkFunction then I get no error and my stats print >>>>>>>>>> as expected. >>>>>>>>>> >>>>>>>>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani < >>>>>>>>>> france...@ververica.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> your hash code and equals seems correct. Can you post a minimum >>>>>>>>>>> stream pipeline reproducer using this class? >>>>>>>>>>> >>>>>>>>>>> FG >>>>>>>>>>> >>>>>>>>>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith < >>>>>>>>>>> java.dev....@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, getting java.lang.IllegalArgumentException: Key group 39 is >>>>>>>>>>>> not in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless >>>>>>>>>>>> you're >>>>>>>>>>>> directly using low level state access APIs, this is most likely >>>>>>>>>>>> caused by >>>>>>>>>>>> non-deterministic shuffle key (hashCode and equals implementation). >>>>>>>>>>>> >>>>>>>>>>>> This is my class, is my hashCode deterministic? >>>>>>>>>>>> >>>>>>>>>>>> public final class MyEventCountKey { >>>>>>>>>>>> private final String countDateTime; private final String >>>>>>>>>>>> domain; private final String event; public >>>>>>>>>>>> MyEventCountKey(final String countDateTime, final String domain, >>>>>>>>>>>> final String event) { >>>>>>>>>>>> this.countDateTime = countDateTime; this.domain = >>>>>>>>>>>> domain; this.event = event; } >>>>>>>>>>>> >>>>>>>>>>>> public String getCountDateTime() { >>>>>>>>>>>> return countDateTime; } >>>>>>>>>>>> >>>>>>>>>>>> public String getDomain() { >>>>>>>>>>>> return domain; } >>>>>>>>>>>> >>>>>>>>>>>> public String getEven() { >>>>>>>>>>>> return event; } >>>>>>>>>>>> >>>>>>>>>>>> @Override public String toString() { >>>>>>>>>>>> return countDateTime + "|" + domain + "|" + event; } >>>>>>>>>>>> >>>>>>>>>>>> @Override public boolean equals(Object o) { >>>>>>>>>>>> if (this == o) return true; if (o == null || >>>>>>>>>>>> getClass() != o.getClass()) return false; MyEventCountKey >>>>>>>>>>>> that = (MyEventCountKey) o; return >>>>>>>>>>>> countDateTime.equals(that.countDateTime) && >>>>>>>>>>>> domain.equals(that.domain) && >>>>>>>>>>>> event.equals(that.event); } >>>>>>>>>>>> >>>>>>>>>>>> @Override public int hashCode() { >>>>>>>>>>>> final int prime = 31; int result = 1; result >>>>>>>>>>>> = prime * result + countDateTime.hashCode(); result = prime >>>>>>>>>>>> * result + domain.hashCode(); result = prime * result + >>>>>>>>>>>> event.hashCode(); return result; } >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> >>>>>> >>>>>