I don't get it? I provided the reproducer. I implemented the interface to Key selector it needs hashcode and equals as well?
I'm attempting to do click stream. So the key is based on processing date/time rounded to the minute + domain name + path So these should be valid below? 2022-01-01T10:02:00 + cnn.com + /article1 2022-01-01T10:02:00 + cnn.com + /article1 2022-01-01T10:02:00 + cnn.com + /article1 2022-01-01T10:02:00 + cnn.com + /article2 2022-01-01T10:03:00 + cnn.com + /article1 2022-01-01T10:03:00 + cnn.com + /article1 2022-01-01T10:03:00 + cnn.com + /article3 2022-01-01T10:03:00 + cnn.com + /article3 On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, <ches...@apache.org> wrote: > Don't KeySelectors also need to be deterministic? > > * The {@link KeySelector} allows to use deterministic objects for operations > such as reduce,* reduceGroup, join, coGroup, etc. *If invoked multiple times > on the same object, the returned key*** must be the same.* > > > On 04/02/2022 18:25, John Smith wrote: > > Hi Francesco, here is the reproducer: > https://github.com/javadevmtl/flink-key-reproducer > > So, essentially it looks like when there's a high influx of records > produced from the source that the Exception is thrown. > > The key is generated by 3 values: date/time rounded to the minute and 2 > strings. > So you will see keys as follows... > 2022-02-04T17:20:00Z|foo|bar > 2022-02-04T17:21:00Z|foo|bar > 2022-02-04T17:22:00Z|foo|bar > > The reproducer has a custom source that basically produces a record in a > loop and sleeps for a specified period of milliseconds 100ms in this case. > The lower the sleep delay the faster records are produced the more chances > the exception is thrown. With a 100ms delay it's always thrown. Setting a > 2000 to 3000ms will guarantee it to work. > The original job uses a Kafka Source so it should technically be able to > handle even a couple thousand records per second. > > > On Thu, 3 Feb 2022 at 16:41, John Smith <java.dev....@gmail.com> wrote: > >> Ok it's not my data either. I think it may be a volume issue. I have >> managed to consistently reproduce the error. I'll upload a reproducer ASAP. >> >> >> >> On Thu, 3 Feb 2022 at 15:37, John Smith <java.dev....@gmail.com> wrote: >> >>> Ok so I tried to create a reproducer but I couldn't reproduce it. But >>> the actual job once in a while throws that error. So I'm wondering if maybe >>> one of the records that comes in is not valid, though I do validate prior >>> to getting to the key and window operators. >>> >>> On Thu, 3 Feb 2022 at 14:32, John Smith <java.dev....@gmail.com> wrote: >>> >>>> Actually maybe not because with PrintSinkFunction it ran for a bit and >>>> then it threw the error. >>>> >>>> On Thu, 3 Feb 2022 at 14:24, John Smith <java.dev....@gmail.com> wrote: >>>> >>>>> Ok it may be the ElasticSearch connector causing the issue? >>>>> >>>>> If I use PrintSinkFunction then I get no error and my stats print as >>>>> expected. >>>>> >>>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani < >>>>> france...@ververica.com> wrote: >>>>> >>>>>> Hi, >>>>>> your hash code and equals seems correct. Can you post a minimum >>>>>> stream pipeline reproducer using this class? >>>>>> >>>>>> FG >>>>>> >>>>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith <java.dev....@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, getting java.lang.IllegalArgumentException: Key group 39 is not >>>>>>> in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're >>>>>>> directly >>>>>>> using low level state access APIs, this is most likely caused by >>>>>>> non-deterministic shuffle key (hashCode and equals implementation). >>>>>>> >>>>>>> This is my class, is my hashCode deterministic? >>>>>>> >>>>>>> public final class MyEventCountKey { >>>>>>> private final String countDateTime; private final String domain; >>>>>>> private final String event; public MyEventCountKey(final String >>>>>>> countDateTime, final String domain, final String event) { >>>>>>> this.countDateTime = countDateTime; this.domain = >>>>>>> domain; this.event = event; } >>>>>>> >>>>>>> public String getCountDateTime() { >>>>>>> return countDateTime; } >>>>>>> >>>>>>> public String getDomain() { >>>>>>> return domain; } >>>>>>> >>>>>>> public String getEven() { >>>>>>> return event; } >>>>>>> >>>>>>> @Override public String toString() { >>>>>>> return countDateTime + "|" + domain + "|" + event; } >>>>>>> >>>>>>> @Override public boolean equals(Object o) { >>>>>>> if (this == o) return true; if (o == null || getClass() >>>>>>> != o.getClass()) return false; MyEventCountKey that = >>>>>>> (MyEventCountKey) o; return >>>>>>> countDateTime.equals(that.countDateTime) && >>>>>>> domain.equals(that.domain) && >>>>>>> event.equals(that.event); } >>>>>>> >>>>>>> @Override public int hashCode() { >>>>>>> final int prime = 31; int result = 1; result = >>>>>>> prime * result + countDateTime.hashCode(); result = prime * >>>>>>> result + domain.hashCode(); result = prime * result + >>>>>>> event.hashCode(); return result; } >>>>>>> } >>>>>>> >>>>>>> >