Your Key selector doesn't need to implement hashCode, but given the same object it has to return the same key. In your reproducer the returned key will have different timestamps, and since the timestamp is included in the hashCode, they will be different each time.

On 07/02/2022 14:50, John Smith wrote:
I don't get it? I provided the reproducer. I implemented the interface to Key selector it needs hashcode and equals as well?

I'm attempting to do click stream. So the key is based on processing date/time rounded to the minute + domain name + path

So these should be valid below?

2022-01-01T10:02:00 + cnn.com <http://cnn.com> + /article1
2022-01-01T10:02:00 + cnn.com <http://cnn.com> + /article1
2022-01-01T10:02:00 + cnn.com <http://cnn.com> + /article1

2022-01-01T10:02:00 + cnn.com <http://cnn.com> + /article2

2022-01-01T10:03:00 + cnn.com <http://cnn.com> + /article1
2022-01-01T10:03:00 + cnn.com <http://cnn.com> + /article1

2022-01-01T10:03:00 + cnn.com <http://cnn.com> + /article3
2022-01-01T10:03:00 + cnn.com <http://cnn.com> + /article3

On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, <ches...@apache.org> wrote:

    Don't KeySelectors also need to be deterministic?

    * The {@link KeySelector} allows to use deterministic objects for
    operations such as reduce, * reduceGroup, join, coGroup, etc. *If
    invoked multiple times on the same object, the returned key*****
    must be the same.*


    On 04/02/2022 18:25, John Smith wrote:
    Hi Francesco,  here is the reproducer:
    https://github.com/javadevmtl/flink-key-reproducer

    So, essentially it looks like when there's a high influx of
    records produced from the source that the Exception is thrown.

    The key is generated by 3 values: date/time rounded to the minute
    and 2 strings.
    So you will see keys as follows...
    2022-02-04T17:20:00Z|foo|bar
    2022-02-04T17:21:00Z|foo|bar
    2022-02-04T17:22:00Z|foo|bar

    The reproducer has a custom source that basically produces a
    record in a loop and sleeps for a specified period of
    milliseconds 100ms in this case.
    The lower the sleep delay the faster records are produced the
    more chances the exception is thrown. With a 100ms delay it's
    always thrown. Setting a 2000 to 3000ms will guarantee it to work.
    The original job uses a Kafka Source so it should technically be
    able to handle even a couple thousand records per second.


    On Thu, 3 Feb 2022 at 16:41, John Smith <java.dev....@gmail.com>
    wrote:

        Ok it's not my data either. I think it may be a volume issue.
        I have managed to consistently reproduce the error. I'll
        upload a reproducer ASAP.



        On Thu, 3 Feb 2022 at 15:37, John Smith
        <java.dev....@gmail.com> wrote:

            Ok so I tried to create a reproducer but I couldn't
            reproduce it. But the actual job once in a while throws
            that error. So I'm wondering if maybe one of the records
            that comes in is not valid, though I do validate prior to
            getting to the key and window operators.

            On Thu, 3 Feb 2022 at 14:32, John Smith
            <java.dev....@gmail.com> wrote:

                Actually maybe not because with PrintSinkFunction it
                ran for a bit and then it threw the error.

                On Thu, 3 Feb 2022 at 14:24, John Smith
                <java.dev....@gmail.com> wrote:

                    Ok it may be the ElasticSearch connector causing
                    the issue?

                    If I use PrintSinkFunction then I get no error
                    and my stats print as expected.

                    On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani
                    <france...@ververica.com> wrote:

                        Hi,
                        your hash code and equals seems correct. Can
                        you post a minimum stream pipeline reproducer
                        using this class?

                        FG

                        On Tue, Feb 1, 2022 at 8:39 PM John Smith
                        <java.dev....@gmail.com> wrote:

                            Hi, getting
                            java.lang.IllegalArgumentException: Key
                            group 39 is not in
                            KeyGroupRange{startKeyGroup=96,
                            endKeyGroup=103}. Unless you're directly
                            using low level state access APIs, this
                            is most likely caused by
                            non-deterministic shuffle key (hashCode
                            and equals implementation).

                            This is my class, is my hashCode
                            deterministic?

                            public final class MyEventCountKey {
                                 private final StringcountDateTime; private 
final Stringdomain; private final Stringevent; public MyEventCountKey(final 
String countDateTime, final String domain, final String event) {
                                     this.countDateTime = countDateTime; 
this.domain = domain; this.event = event; }

                                 public StringgetCountDateTime() {
                                     return countDateTime; }

                                 public StringgetDomain() {
                                     return domain; }

                                 public StringgetEven() {
                                     return event; }

                                 @Override public StringtoString() {
                                     return countDateTime +"|" +domain +"|" 
+event; }

                                 @Override public boolean equals(Object o) {
                                     if (this == o)return true; if (o ==null || 
getClass() != o.getClass())return false; MyEventCountKey that = (MyEventCountKey) o; 
return countDateTime.equals(that.countDateTime) &&
                                             domain.equals(that.domain) &&
                                             event.equals(that.event); }

                                 @Override public int hashCode() {
                                     final int prime =31; int result =1; result 
= prime * result +countDateTime.hashCode(); result = prime * result 
+domain.hashCode(); result = prime * result +event.hashCode(); return result; }
                            }


Reply via email to