How to proper hashCode() for keys.

2022-02-01 Thread John Smith
Hi, getting java.lang.IllegalArgumentException: Key group 39 is not in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're directly using low level state access APIs, this is most likely caused by non-deterministic shuffle key (hashCode and equals implementation). This is my class, is

Re: How to proper hashCode() for keys.

2022-02-02 Thread Francesco Guardiani
Hi, your hash code and equals seems correct. Can you post a minimum stream pipeline reproducer using this class? FG On Tue, Feb 1, 2022 at 8:39 PM John Smith wrote: > Hi, getting java.lang.IllegalArgumentException: Key group 39 is not in > KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unles

Re: How to proper hashCode() for keys.

2022-02-03 Thread John Smith
Ok it may be the ElasticSearch connector causing the issue? If I use PrintSinkFunction then I get no error and my stats print as expected. On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani wrote: > Hi, > your hash code and equals seems correct. Can you post a minimum stream > pipeline reproducer

Re: How to proper hashCode() for keys.

2022-02-03 Thread John Smith
Actually maybe not because with PrintSinkFunction it ran for a bit and then it threw the error. On Thu, 3 Feb 2022 at 14:24, John Smith wrote: > Ok it may be the ElasticSearch connector causing the issue? > > If I use PrintSinkFunction then I get no error and my stats print as > expected. > > On

Re: How to proper hashCode() for keys.

2022-02-03 Thread John Smith
Ok so I tried to create a reproducer but I couldn't reproduce it. But the actual job once in a while throws that error. So I'm wondering if maybe one of the records that comes in is not valid, though I do validate prior to getting to the key and window operators. On Thu, 3 Feb 2022 at 14:32, John

Re: How to proper hashCode() for keys.

2022-02-03 Thread John Smith
Ok it's not my data either. I think it may be a volume issue. I have managed to consistently reproduce the error. I'll upload a reproducer ASAP. On Thu, 3 Feb 2022 at 15:37, John Smith wrote: > Ok so I tried to create a reproducer but I couldn't reproduce it. But the > actual job once in a whi

Re: How to proper hashCode() for keys.

2022-02-04 Thread John Smith
Hi Francesco, here is the reproducer: https://github.com/javadevmtl/flink-key-reproducer So, essentially it looks like when there's a high influx of records produced from the source that the Exception is thrown. The key is generated by 3 values: date/time rounded to the minute and 2 strings. So

Re: How to proper hashCode() for keys.

2022-02-06 Thread Chesnay Schepler
Don't KeySelectors also need to be deterministic? * The {@link KeySelector} allows to use deterministic objects for operations such as reduce, * reduceGroup, join, coGroup, etc. *If invoked multiple times on the same object, the returned key* must be the same.* On 04/02/2022 18:25, John

Re: How to proper hashCode() for keys.

2022-02-07 Thread John Smith
I don't get it? I provided the reproducer. I implemented the interface to Key selector it needs hashcode and equals as well? I'm attempting to do click stream. So the key is based on processing date/time rounded to the minute + domain name + path So these should be valid below? 2022-01-01T10:02:

Re: How to proper hashCode() for keys.

2022-02-07 Thread Chesnay Schepler
Your Key selector doesn't need to implement hashCode, but given the same object it has to return the same key. In your reproducer the returned key will have different timestamps, and since the timestamp is included in the hashCode, they will be different each time. On 07/02/2022 14:50, John Sm

Re: How to proper hashCode() for keys.

2022-02-07 Thread John Smith
The key selector works. It only causes an issue if there too many keys produced in one shot. For example of 100 "same" keys are produced for that 1 minutes it's ok. But if 101 are produced the error happens. If you look at the reproducer at least that's what's hapenning How do you key a count ba

Re: How to proper hashCode() for keys.

2022-02-07 Thread David Morávek
> > The key selector works. No it does not ;) It depends on the system time so it's not deterministic (you can get different keys for the very same element). How do you key a count based on the time. I have taken this from samples > online. > This is what the windowing is for. You basically wan

Re: How to proper hashCode() for keys.

2022-02-07 Thread John Smith
Maybe there's a misunderstanding. But basically I want to do clickstream count for a given "url" and for simplicity and accuracy of the count base it on processing time (event time doesn't matter as long as I get a total of clicks at that given processing time) So regardless of the event time. I w

Re: How to proper hashCode() for keys.

2022-02-07 Thread Ali Bahadir Zeybek
Hello John, During the lifecycle of the execution for a given event, the key information is not passed in between different operators, but they are computed based on the given key selector, every time an (keyed)operator sees the event. Therefore, the same event, within the same pipeline, could be

Re: How to proper hashCode() for keys.

2022-02-07 Thread Jing Ge
Hi John, your getKey() implementation shows that it is not deterministic, since calling it with the same click instance multiple times will return different keys. For example a call at 12:01:59.950 and a call at 12:02:00.050 with the same click instance will return two different keys: 2022-04-07T

Re: How to proper hashCode() for keys.

2022-02-07 Thread John Smith
Ok I think Ali's solution makes the most sense to me. I'll try it and let you know. On Mon, Feb 7, 2022 at 11:44 AM Jing Ge wrote: > Hi John, > > your getKey() implementation shows that it is not deterministic, since > calling it with the same click instance multiple times will return > differen

Re: How to proper hashCode() for keys.

2022-02-11 Thread John Smith
Ok I used the method suggested by Ali. The error is gone. But now I see multiple counts emitted for the same key... DataStream slStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source") .uid(kafkaTopic).name(kafkaTopic) .setParallelism(kafkaParallelism

Re: How to proper hashCode() for keys.

2022-02-14 Thread Ali Bahadir Zeybek
Hello John, Currently you are grouping the elements two times based on some time attribute, one while keying - with event time - and one while windowing - with processing time. Therefore, the windowing mechanism produces a new window computation when you see an element with the same key but arrive

Re: How to proper hashCode() for keys.

2022-02-14 Thread John Smith
Hi, thanks. As previously mentioned, processing time. So I regardless when the event was generated I want to count all events I have right now (as soon as they are seen by the flink job). On Mon, Feb 14, 2022 at 4:16 AM Ali Bahadir Zeybek wrote: > Hello John, > > Currently you are grouping the e

Re: How to proper hashCode() for keys.

2022-02-14 Thread Ali Bahadir Zeybek
Hello John, Then may I ask you why you need to use a time attribute as part of your key? Why not just key by the fields like `mydomain.com` and `some-article` in your example and use only window operator for grouping elements based on time? Sincerely, Ali On Mon, Feb 14, 2022 at 3:55 PM John Sm

Re: How to proper hashCode() for keys.

2022-02-14 Thread John Smith
Because I want to group them for the last X minutes. In this case last 1 minute. On Mon, Feb 14, 2022 at 10:01 AM Ali Bahadir Zeybek wrote: > Hello John, > > Then may I ask you why you need to use a time attribute as part of your > key? > Why not just key by the fields like `mydomain.com` and `s

Re: How to proper hashCode() for keys.

2022-02-14 Thread Ali Bahadir Zeybek
Hello John, That is what exactly the window operator does for you. Can you please check the documentation[1] and let us know what part of the window operator alone does not suffice for the use case? Sincerely, Ali [1]: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastre

Re: How to proper hashCode() for keys.

2022-02-14 Thread John Smith
Hi, I get that but I want to output that key so I can store it in Elastic grouped by the minute. I had explained with data examples above. But just to be sure Lets pretends the current WALL time is 2022-02-14T11:38:01.123Z and I get the bellow clicks event time here (ignored/not read)|cnn.co

Re: How to proper hashCode() for keys.

2022-02-16 Thread Ali Bahadir Zeybek
Hello John, The requirement you have can be achieved by having a process window function in order to enrich the aggregate data with metadata information of the window. Please have a look at the training example[1] to see how to access the window information within a process window function. Since