I don't get it? I provided the reproducer. I implemented the interface to
Key selector it needs hashcode and equals as well?

I'm attempting to do click stream. So the key is based on processing
date/time rounded to the minute + domain name + path

So these should be valid below?

2022-01-01T10:02:00 + cnn.com + /article1
2022-01-01T10:02:00 + cnn.com + /article1
2022-01-01T10:02:00 + cnn.com + /article1

2022-01-01T10:02:00 + cnn.com + /article2

2022-01-01T10:03:00 + cnn.com + /article1
2022-01-01T10:03:00 + cnn.com + /article1

2022-01-01T10:03:00 + cnn.com + /article3
2022-01-01T10:03:00 + cnn.com + /article3

On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, <ches...@apache.org>
wrote:

> Don't KeySelectors also need to be deterministic?
>
> * The {@link KeySelector} allows to use deterministic objects for operations 
> such as reduce,* reduceGroup, join, coGroup, etc. *If invoked multiple times 
> on the same object, the returned key*** must be the same.*
>
>
> On 04/02/2022 18:25, John Smith wrote:
>
> Hi Francesco,  here is the reproducer:
> https://github.com/javadevmtl/flink-key-reproducer
>
> So, essentially it looks like when there's a high influx of records
> produced from the source that the Exception is thrown.
>
> The key is generated by 3 values: date/time rounded to the minute and 2
> strings.
> So you will see keys as follows...
> 2022-02-04T17:20:00Z|foo|bar
> 2022-02-04T17:21:00Z|foo|bar
> 2022-02-04T17:22:00Z|foo|bar
>
> The reproducer has a custom source that basically produces a record in a
> loop and sleeps for a specified period of milliseconds 100ms in this case.
> The lower the sleep delay the faster records are produced the more chances
> the exception is thrown. With a 100ms delay it's always thrown. Setting a
> 2000 to 3000ms will guarantee it to work.
> The original job uses a Kafka Source so it should technically be able to
> handle even a couple thousand records per second.
>
>
> On Thu, 3 Feb 2022 at 16:41, John Smith <java.dev....@gmail.com> wrote:
>
>> Ok it's not my data either. I think it may be a volume issue. I have
>> managed to consistently reproduce the error. I'll upload a reproducer ASAP.
>>
>>
>>
>> On Thu, 3 Feb 2022 at 15:37, John Smith <java.dev....@gmail.com> wrote:
>>
>>> Ok so I tried to create a reproducer but I couldn't reproduce it. But
>>> the actual job once in a while throws that error. So I'm wondering if maybe
>>> one of the records that comes in is not valid, though I do validate prior
>>> to getting to the key and window operators.
>>>
>>> On Thu, 3 Feb 2022 at 14:32, John Smith <java.dev....@gmail.com> wrote:
>>>
>>>> Actually maybe not because with PrintSinkFunction it ran for a bit and
>>>> then it threw the error.
>>>>
>>>> On Thu, 3 Feb 2022 at 14:24, John Smith <java.dev....@gmail.com> wrote:
>>>>
>>>>> Ok it may be the ElasticSearch connector causing the issue?
>>>>>
>>>>> If I use PrintSinkFunction then I get no error and my stats print as
>>>>> expected.
>>>>>
>>>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani <
>>>>> france...@ververica.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>> your hash code and equals seems correct. Can you post a minimum
>>>>>> stream pipeline reproducer using this class?
>>>>>>
>>>>>> FG
>>>>>>
>>>>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith <java.dev....@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi, getting java.lang.IllegalArgumentException: Key group 39 is not
>>>>>>> in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're 
>>>>>>> directly
>>>>>>> using low level state access APIs, this is most likely caused by
>>>>>>> non-deterministic shuffle key (hashCode and equals implementation).
>>>>>>>
>>>>>>> This is my class, is my hashCode deterministic?
>>>>>>>
>>>>>>> public final class MyEventCountKey {
>>>>>>>     private final String countDateTime;    private final String domain; 
>>>>>>>    private final String event;    public MyEventCountKey(final String 
>>>>>>> countDateTime, final String domain, final String event) {
>>>>>>>         this.countDateTime = countDateTime;        this.domain = 
>>>>>>> domain;        this.event = event;    }
>>>>>>>
>>>>>>>     public String getCountDateTime() {
>>>>>>>         return countDateTime;    }
>>>>>>>
>>>>>>>     public String getDomain() {
>>>>>>>         return domain;    }
>>>>>>>
>>>>>>>     public String getEven() {
>>>>>>>         return event;    }
>>>>>>>
>>>>>>>     @Override    public String toString() {
>>>>>>>         return countDateTime + "|" + domain + "|" + event;    }
>>>>>>>
>>>>>>>     @Override    public boolean equals(Object o) {
>>>>>>>         if (this == o) return true;        if (o == null || getClass() 
>>>>>>> != o.getClass()) return false;        MyEventCountKey that = 
>>>>>>> (MyEventCountKey) o;        return 
>>>>>>> countDateTime.equals(that.countDateTime) &&
>>>>>>>                 domain.equals(that.domain) &&
>>>>>>>                 event.equals(that.event);    }
>>>>>>>
>>>>>>>     @Override    public int hashCode() {
>>>>>>>         final int prime = 31;        int result = 1;        result = 
>>>>>>> prime * result + countDateTime.hashCode();        result = prime * 
>>>>>>> result + domain.hashCode();        result = prime * result +  
>>>>>>> event.hashCode();        return result;    }
>>>>>>> }
>>>>>>>
>>>>>>>
>

Reply via email to