The key selector works. It only causes an issue if there too many keys
produced in one shot. For example of 100 "same" keys are produced for that
1 minutes it's ok. But if 101 are produced the error happens.


If you look at the reproducer at least that's what's hapenning

How do you key a count based on the time. I have taken this from samples
online.

The key is that particular time for that particular URL path.

So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00

On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, <ches...@apache.org>
wrote:

> Your Key selector doesn't need to implement hashCode, but given the same
> object it has to return the same key.
> In your reproducer the returned key will have different timestamps, and
> since the timestamp is included in the hashCode, they will be different
> each time.
>
> On 07/02/2022 14:50, John Smith wrote:
>
> I don't get it? I provided the reproducer. I implemented the interface to
> Key selector it needs hashcode and equals as well?
>
> I'm attempting to do click stream. So the key is based on processing
> date/time rounded to the minute + domain name + path
>
> So these should be valid below?
>
> 2022-01-01T10:02:00 + cnn.com + /article1
> 2022-01-01T10:02:00 + cnn.com + /article1
> 2022-01-01T10:02:00 + cnn.com + /article1
>
> 2022-01-01T10:02:00 + cnn.com + /article2
>
> 2022-01-01T10:03:00 + cnn.com + /article1
> 2022-01-01T10:03:00 + cnn.com + /article1
>
> 2022-01-01T10:03:00 + cnn.com + /article3
> 2022-01-01T10:03:00 + cnn.com + /article3
>
> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, <ches...@apache.org>
> wrote:
>
>> Don't KeySelectors also need to be deterministic?
>>
>> * The {@link KeySelector} allows to use deterministic objects for operations 
>> such as reduce,* reduceGroup, join, coGroup, etc. *If invoked multiple times 
>> on the same object, the returned key*** must be the same.*
>>
>>
>> On 04/02/2022 18:25, John Smith wrote:
>>
>> Hi Francesco,  here is the reproducer:
>> https://github.com/javadevmtl/flink-key-reproducer
>>
>> So, essentially it looks like when there's a high influx of records
>> produced from the source that the Exception is thrown.
>>
>> The key is generated by 3 values: date/time rounded to the minute and 2
>> strings.
>> So you will see keys as follows...
>> 2022-02-04T17:20:00Z|foo|bar
>> 2022-02-04T17:21:00Z|foo|bar
>> 2022-02-04T17:22:00Z|foo|bar
>>
>> The reproducer has a custom source that basically produces a record in a
>> loop and sleeps for a specified period of milliseconds 100ms in this case.
>> The lower the sleep delay the faster records are produced the more
>> chances the exception is thrown. With a 100ms delay it's always thrown.
>> Setting a 2000 to 3000ms will guarantee it to work.
>> The original job uses a Kafka Source so it should technically be able to
>> handle even a couple thousand records per second.
>>
>>
>> On Thu, 3 Feb 2022 at 16:41, John Smith <java.dev....@gmail.com> wrote:
>>
>>> Ok it's not my data either. I think it may be a volume issue. I have
>>> managed to consistently reproduce the error. I'll upload a reproducer ASAP.
>>>
>>>
>>>
>>> On Thu, 3 Feb 2022 at 15:37, John Smith <java.dev....@gmail.com> wrote:
>>>
>>>> Ok so I tried to create a reproducer but I couldn't reproduce it. But
>>>> the actual job once in a while throws that error. So I'm wondering if maybe
>>>> one of the records that comes in is not valid, though I do validate prior
>>>> to getting to the key and window operators.
>>>>
>>>> On Thu, 3 Feb 2022 at 14:32, John Smith <java.dev....@gmail.com> wrote:
>>>>
>>>>> Actually maybe not because with PrintSinkFunction it ran for a bit and
>>>>> then it threw the error.
>>>>>
>>>>> On Thu, 3 Feb 2022 at 14:24, John Smith <java.dev....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Ok it may be the ElasticSearch connector causing the issue?
>>>>>>
>>>>>> If I use PrintSinkFunction then I get no error and my stats print as
>>>>>> expected.
>>>>>>
>>>>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani <
>>>>>> france...@ververica.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> your hash code and equals seems correct. Can you post a minimum
>>>>>>> stream pipeline reproducer using this class?
>>>>>>>
>>>>>>> FG
>>>>>>>
>>>>>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith <java.dev....@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi, getting java.lang.IllegalArgumentException: Key group 39 is not
>>>>>>>> in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're 
>>>>>>>> directly
>>>>>>>> using low level state access APIs, this is most likely caused by
>>>>>>>> non-deterministic shuffle key (hashCode and equals implementation).
>>>>>>>>
>>>>>>>> This is my class, is my hashCode deterministic?
>>>>>>>>
>>>>>>>> public final class MyEventCountKey {
>>>>>>>>     private final String countDateTime;    private final String 
>>>>>>>> domain;    private final String event;    public MyEventCountKey(final 
>>>>>>>> String countDateTime, final String domain, final String event) {
>>>>>>>>         this.countDateTime = countDateTime;        this.domain = 
>>>>>>>> domain;        this.event = event;    }
>>>>>>>>
>>>>>>>>     public String getCountDateTime() {
>>>>>>>>         return countDateTime;    }
>>>>>>>>
>>>>>>>>     public String getDomain() {
>>>>>>>>         return domain;    }
>>>>>>>>
>>>>>>>>     public String getEven() {
>>>>>>>>         return event;    }
>>>>>>>>
>>>>>>>>     @Override    public String toString() {
>>>>>>>>         return countDateTime + "|" + domain + "|" + event;    }
>>>>>>>>
>>>>>>>>     @Override    public boolean equals(Object o) {
>>>>>>>>         if (this == o) return true;        if (o == null || getClass() 
>>>>>>>> != o.getClass()) return false;        MyEventCountKey that = 
>>>>>>>> (MyEventCountKey) o;        return 
>>>>>>>> countDateTime.equals(that.countDateTime) &&
>>>>>>>>                 domain.equals(that.domain) &&
>>>>>>>>                 event.equals(that.event);    }
>>>>>>>>
>>>>>>>>     @Override    public int hashCode() {
>>>>>>>>         final int prime = 31;        int result = 1;        result = 
>>>>>>>> prime * result + countDateTime.hashCode();        result = prime * 
>>>>>>>> result + domain.hashCode();        result = prime * result +  
>>>>>>>> event.hashCode();        return result;    }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>
>

Reply via email to