Your Key selector doesn't need to implement hashCode, but given the same
object it has to return the same key.
In your reproducer the returned key will have different timestamps, and
since the timestamp is included in the hashCode, they will be different
each time.
On 07/02/2022 14:50, John Smith wrote:
I don't get it? I provided the reproducer. I implemented the interface
to Key selector it needs hashcode and equals as well?
I'm attempting to do click stream. So the key is based on processing
date/time rounded to the minute + domain name + path
So these should be valid below?
2022-01-01T10:02:00 + cnn.com <http://cnn.com> + /article1
2022-01-01T10:02:00 + cnn.com <http://cnn.com> + /article1
2022-01-01T10:02:00 + cnn.com <http://cnn.com> + /article1
2022-01-01T10:02:00 + cnn.com <http://cnn.com> + /article2
2022-01-01T10:03:00 + cnn.com <http://cnn.com> + /article1
2022-01-01T10:03:00 + cnn.com <http://cnn.com> + /article1
2022-01-01T10:03:00 + cnn.com <http://cnn.com> + /article3
2022-01-01T10:03:00 + cnn.com <http://cnn.com> + /article3
On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler,
<ches...@apache.org> wrote:
Don't KeySelectors also need to be deterministic?
* The {@link KeySelector} allows to use deterministic objects for
operations such as reduce, * reduceGroup, join, coGroup, etc. *If
invoked multiple times on the same object, the returned key*****
must be the same.*
On 04/02/2022 18:25, John Smith wrote:
Hi Francesco, here is the reproducer:
https://github.com/javadevmtl/flink-key-reproducer
So, essentially it looks like when there's a high influx of
records produced from the source that the Exception is thrown.
The key is generated by 3 values: date/time rounded to the minute
and 2 strings.
So you will see keys as follows...
2022-02-04T17:20:00Z|foo|bar
2022-02-04T17:21:00Z|foo|bar
2022-02-04T17:22:00Z|foo|bar
The reproducer has a custom source that basically produces a
record in a loop and sleeps for a specified period of
milliseconds 100ms in this case.
The lower the sleep delay the faster records are produced the
more chances the exception is thrown. With a 100ms delay it's
always thrown. Setting a 2000 to 3000ms will guarantee it to work.
The original job uses a Kafka Source so it should technically be
able to handle even a couple thousand records per second.
On Thu, 3 Feb 2022 at 16:41, John Smith <java.dev....@gmail.com>
wrote:
Ok it's not my data either. I think it may be a volume issue.
I have managed to consistently reproduce the error. I'll
upload a reproducer ASAP.
On Thu, 3 Feb 2022 at 15:37, John Smith
<java.dev....@gmail.com> wrote:
Ok so I tried to create a reproducer but I couldn't
reproduce it. But the actual job once in a while throws
that error. So I'm wondering if maybe one of the records
that comes in is not valid, though I do validate prior to
getting to the key and window operators.
On Thu, 3 Feb 2022 at 14:32, John Smith
<java.dev....@gmail.com> wrote:
Actually maybe not because with PrintSinkFunction it
ran for a bit and then it threw the error.
On Thu, 3 Feb 2022 at 14:24, John Smith
<java.dev....@gmail.com> wrote:
Ok it may be the ElasticSearch connector causing
the issue?
If I use PrintSinkFunction then I get no error
and my stats print as expected.
On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani
<france...@ververica.com> wrote:
Hi,
your hash code and equals seems correct. Can
you post a minimum stream pipeline reproducer
using this class?
FG
On Tue, Feb 1, 2022 at 8:39 PM John Smith
<java.dev....@gmail.com> wrote:
Hi, getting
java.lang.IllegalArgumentException: Key
group 39 is not in
KeyGroupRange{startKeyGroup=96,
endKeyGroup=103}. Unless you're directly
using low level state access APIs, this
is most likely caused by
non-deterministic shuffle key (hashCode
and equals implementation).
This is my class, is my hashCode
deterministic?
public final class MyEventCountKey {
private final StringcountDateTime; private
final Stringdomain; private final Stringevent; public MyEventCountKey(final
String countDateTime, final String domain, final String event) {
this.countDateTime = countDateTime;
this.domain = domain; this.event = event; }
public StringgetCountDateTime() {
return countDateTime; }
public StringgetDomain() {
return domain; }
public StringgetEven() {
return event; }
@Override public StringtoString() {
return countDateTime +"|" +domain +"|"
+event; }
@Override public boolean equals(Object o) {
if (this == o)return true; if (o ==null ||
getClass() != o.getClass())return false; MyEventCountKey that = (MyEventCountKey) o;
return countDateTime.equals(that.countDateTime) &&
domain.equals(that.domain) &&
event.equals(that.event); }
@Override public int hashCode() {
final int prime =31; int result =1; result
= prime * result +countDateTime.hashCode(); result = prime * result
+domain.hashCode(); result = prime * result +event.hashCode(); return result; }
}