Re: How to proper hashCode() for keys.

2022-02-16 Thread Ali Bahadir Zeybek
Hello John,

The requirement you have can be achieved by having a process window function
in order to enrich the aggregate data with metadata information of the
window.
Please have a look at the training example[1] to see how to access the
window
information within a process window function.

Sincerely,

Ali

[1]:
https://github.com/ververica/flink-training/blob/master/troubleshooting/introduction/src/main/java/com/ververica/flink/training/exercises/TroubledStreamingJob.java#L155

On Mon, Feb 14, 2022 at 5:43 PM John Smith  wrote:

> Hi, I get that but I want to output that key so I can store it in Elastic
> grouped by the minute.
>
> I had explained with data examples above. But just to be sure
>
> Lets pretends the current WALL time is 2022-02-14T11:38:01.123Z and I get
> the bellow clicks
>
> event time here (ignored/not read)|cnn.com|/some-article
> event time here (ignored/not read)|cnn.com|/some-article
> event time here (ignored/not read)|cnn.com|/another-article
> event time here (ignored/not read)|cnn.com|/some-article
>
> The output should be...
>
> 2022-02-14T11:38:00.000Z (this is the wall time rounded to the minute)|
> cnn.com|some-article  count = 3
> 2022-02-14T11:38:00.000Z( this is the wall time rounded to the minute)|
> cnn.com|another-article  count = 1
>
>
>
>
>
> On Mon, Feb 14, 2022 at 10:08 AM Ali Bahadir Zeybek 
> wrote:
>
>> Hello John,
>>
>> That is what exactly the window operator does for you. Can you please
>> check the
>> documentation[1] and let us know what part of the window operator alone
>> does
>> not suffice for the use case?
>>
>> Sincerely,
>>
>> Ali
>>
>> [1]:
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows
>>
>> On Mon, Feb 14, 2022 at 4:03 PM John Smith 
>> wrote:
>>
>>> Because I want to group them for the last X minutes. In this case last 1
>>> minute.
>>>
>>> On Mon, Feb 14, 2022 at 10:01 AM Ali Bahadir Zeybek 
>>> wrote:
>>>
 Hello John,

 Then may I ask you why you need to use a time attribute as part of your
 key?
 Why not just key by the fields like `mydomain.com` and `some-article`
 in your
 example and use only window operator for grouping elements based on
 time?

 Sincerely,

 Ali

 On Mon, Feb 14, 2022 at 3:55 PM John Smith 
 wrote:

> Hi, thanks. As previously mentioned, processing time. So I
> regardless when the event was generated I want to count all events I have
> right now (as soon as they are seen by the flink job).
>
> On Mon, Feb 14, 2022 at 4:16 AM Ali Bahadir Zeybek 
> wrote:
>
>> Hello John,
>>
>> Currently you are grouping the elements two times based on some time
>> attribute, one while keying - with event time - and one while
>> windowing - with
>> processing time. Therefore, the windowing mechanism produces a new
>> window
>> computation when you see an element with the same key but arrived
>> later from
>> the previous window start and end timestamps. Can you please clarify
>> with
>> which notion of time you would like to handle the stream of data?
>>
>> Sincerely,
>>
>> Ali
>>
>> On Fri, Feb 11, 2022 at 6:43 PM John Smith 
>> wrote:
>>
>>> Ok I used the method suggested by Ali. The error is gone. But now I
>>> see multiple counts emitted for the same key...
>>>
>>> DataStream slStream = env.fromSource(kafkaSource, 
>>> WatermarkStrategy.noWatermarks(), "Kafka Source")
>>> .uid(kafkaTopic).name(kafkaTopic)
>>> .setParallelism(kafkaParallelism)
>>> .flatMap(new MapToMyEvent("my-event", windowSizeMins, 
>>> "message")) <-- Timestamp in GMT created here rounded to the 
>>> closest minute down.
>>> .uid("map-json-logs").name("map-json-logs");
>>>
>>> slStream.keyBy(new MinutesKeySelector())
>>> 
>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(windowSizeMins))) 
>>> < Tumbling window of 1 minute.
>>>
>>>
>>>
>>> So below you will see a new count was emitted at 16:51 and 16:55
>>>
>>> {"countId":"2022-02-11T16:50:00Z|mydomain.com
>>> |/some-article","countDateTime":"2022-02-11T16:50:00Z","domain":"
>>> mydomain.com","uri":"/some-article","count":3542}
>>> -
>>> {"countId":"2022-02-11T16:51:00Z|mydomain.com
>>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
>>> mydomain.com","uri":"/some-article","count":16503}
>>> {"countId":"2022-02-11T16:51:00Z|mydomain.com
>>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
>>> mydomain.com","uri":"/some-article","count":70}
>>> -
>>>
>>> {"countId":"2022-02-11T16:52:00Z|mydomain.com
>>> |/some-article","countDateTime":"2022-02-11T16:52:00Z","domain":"
>>> mydomain.com","uri":"/some-article","count":16037}
>>> {"countId":"2022

Re: How to proper hashCode() for keys.

2022-02-14 Thread John Smith
Hi, I get that but I want to output that key so I can store it in Elastic
grouped by the minute.

I had explained with data examples above. But just to be sure

Lets pretends the current WALL time is 2022-02-14T11:38:01.123Z and I get
the bellow clicks

event time here (ignored/not read)|cnn.com|/some-article
event time here (ignored/not read)|cnn.com|/some-article
event time here (ignored/not read)|cnn.com|/another-article
event time here (ignored/not read)|cnn.com|/some-article

The output should be...

2022-02-14T11:38:00.000Z (this is the wall time rounded to the minute)|
cnn.com|some-article  count = 3
2022-02-14T11:38:00.000Z( this is the wall time rounded to the minute)|
cnn.com|another-article  count = 1





On Mon, Feb 14, 2022 at 10:08 AM Ali Bahadir Zeybek 
wrote:

> Hello John,
>
> That is what exactly the window operator does for you. Can you please
> check the
> documentation[1] and let us know what part of the window operator alone
> does
> not suffice for the use case?
>
> Sincerely,
>
> Ali
>
> [1]:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows
>
> On Mon, Feb 14, 2022 at 4:03 PM John Smith  wrote:
>
>> Because I want to group them for the last X minutes. In this case last 1
>> minute.
>>
>> On Mon, Feb 14, 2022 at 10:01 AM Ali Bahadir Zeybek 
>> wrote:
>>
>>> Hello John,
>>>
>>> Then may I ask you why you need to use a time attribute as part of your
>>> key?
>>> Why not just key by the fields like `mydomain.com` and `some-article`
>>> in your
>>> example and use only window operator for grouping elements based on time?
>>>
>>> Sincerely,
>>>
>>> Ali
>>>
>>> On Mon, Feb 14, 2022 at 3:55 PM John Smith 
>>> wrote:
>>>
 Hi, thanks. As previously mentioned, processing time. So I
 regardless when the event was generated I want to count all events I have
 right now (as soon as they are seen by the flink job).

 On Mon, Feb 14, 2022 at 4:16 AM Ali Bahadir Zeybek 
 wrote:

> Hello John,
>
> Currently you are grouping the elements two times based on some time
> attribute, one while keying - with event time - and one while
> windowing - with
> processing time. Therefore, the windowing mechanism produces a new
> window
> computation when you see an element with the same key but arrived
> later from
> the previous window start and end timestamps. Can you please clarify
> with
> which notion of time you would like to handle the stream of data?
>
> Sincerely,
>
> Ali
>
> On Fri, Feb 11, 2022 at 6:43 PM John Smith 
> wrote:
>
>> Ok I used the method suggested by Ali. The error is gone. But now I
>> see multiple counts emitted for the same key...
>>
>> DataStream slStream = env.fromSource(kafkaSource, 
>> WatermarkStrategy.noWatermarks(), "Kafka Source")
>> .uid(kafkaTopic).name(kafkaTopic)
>> .setParallelism(kafkaParallelism)
>> .flatMap(new MapToMyEvent("my-event", windowSizeMins, 
>> "message")) <-- Timestamp in GMT created here rounded to the closest 
>> minute down.
>> .uid("map-json-logs").name("map-json-logs");
>>
>> slStream.keyBy(new MinutesKeySelector())
>> 
>> .window(TumblingProcessingTimeWindows.of(Time.minutes(windowSizeMins))) 
>> < Tumbling window of 1 minute.
>>
>>
>>
>> So below you will see a new count was emitted at 16:51 and 16:55
>>
>> {"countId":"2022-02-11T16:50:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:50:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":3542}
>> -
>> {"countId":"2022-02-11T16:51:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":16503}
>> {"countId":"2022-02-11T16:51:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":70}
>> -
>>
>> {"countId":"2022-02-11T16:52:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:52:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":16037}
>> {"countId":"2022-02-11T16:53:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:53:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":18679}
>> {"countId":"2022-02-11T16:54:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:54:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":17697}
>> -
>>
>> {"countId":"2022-02-11T16:55:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":18066}
>> {"countId":"2022-02-11T16:55:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
>> mydomain.

Re: How to proper hashCode() for keys.

2022-02-14 Thread Ali Bahadir Zeybek
Hello John,

That is what exactly the window operator does for you. Can you please check
the
documentation[1] and let us know what part of the window operator alone does
not suffice for the use case?

Sincerely,

Ali

[1]:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows

On Mon, Feb 14, 2022 at 4:03 PM John Smith  wrote:

> Because I want to group them for the last X minutes. In this case last 1
> minute.
>
> On Mon, Feb 14, 2022 at 10:01 AM Ali Bahadir Zeybek 
> wrote:
>
>> Hello John,
>>
>> Then may I ask you why you need to use a time attribute as part of your
>> key?
>> Why not just key by the fields like `mydomain.com` and `some-article` in
>> your
>> example and use only window operator for grouping elements based on time?
>>
>> Sincerely,
>>
>> Ali
>>
>> On Mon, Feb 14, 2022 at 3:55 PM John Smith 
>> wrote:
>>
>>> Hi, thanks. As previously mentioned, processing time. So I
>>> regardless when the event was generated I want to count all events I have
>>> right now (as soon as they are seen by the flink job).
>>>
>>> On Mon, Feb 14, 2022 at 4:16 AM Ali Bahadir Zeybek 
>>> wrote:
>>>
 Hello John,

 Currently you are grouping the elements two times based on some time
 attribute, one while keying - with event time - and one while windowing
 - with
 processing time. Therefore, the windowing mechanism produces a new
 window
 computation when you see an element with the same key but arrived later
 from
 the previous window start and end timestamps. Can you please clarify
 with
 which notion of time you would like to handle the stream of data?

 Sincerely,

 Ali

 On Fri, Feb 11, 2022 at 6:43 PM John Smith 
 wrote:

> Ok I used the method suggested by Ali. The error is gone. But now I
> see multiple counts emitted for the same key...
>
> DataStream slStream = env.fromSource(kafkaSource, 
> WatermarkStrategy.noWatermarks(), "Kafka Source")
> .uid(kafkaTopic).name(kafkaTopic)
> .setParallelism(kafkaParallelism)
> .flatMap(new MapToMyEvent("my-event", windowSizeMins, "message")) 
> <-- Timestamp in GMT created here rounded to the closest minute down.
> .uid("map-json-logs").name("map-json-logs");
>
> slStream.keyBy(new MinutesKeySelector())
> 
> .window(TumblingProcessingTimeWindows.of(Time.minutes(windowSizeMins))) 
> < Tumbling window of 1 minute.
>
>
>
> So below you will see a new count was emitted at 16:51 and 16:55
>
> {"countId":"2022-02-11T16:50:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:50:00Z","domain":"
> mydomain.com","uri":"/some-article","count":3542}
> -
> {"countId":"2022-02-11T16:51:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
> mydomain.com","uri":"/some-article","count":16503}
> {"countId":"2022-02-11T16:51:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
> mydomain.com","uri":"/some-article","count":70}
> -
>
> {"countId":"2022-02-11T16:52:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:52:00Z","domain":"
> mydomain.com","uri":"/some-article","count":16037}
> {"countId":"2022-02-11T16:53:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:53:00Z","domain":"
> mydomain.com","uri":"/some-article","count":18679}
> {"countId":"2022-02-11T16:54:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:54:00Z","domain":"
> mydomain.com","uri":"/some-article","count":17697}
> -
>
> {"countId":"2022-02-11T16:55:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
> mydomain.com","uri":"/some-article","count":18066}
> {"countId":"2022-02-11T16:55:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
> mydomain.com","uri":"/some-article","count":58}
> -
> {"countId":"2022-02-11T16:56:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:56:00Z","domain":"
> mydomain.com","uri":"/some-article","count":17489}
>
>
>
>
> On Mon, Feb 7, 2022 at 12:44 PM John Smith 
> wrote:
>
>> Ok I think Ali's solution makes the most sense to me. I'll try it and
>> let you know.
>>
>> On Mon, Feb 7, 2022 at 11:44 AM Jing Ge  wrote:
>>
>>> Hi John,
>>>
>>> your getKey() implementation shows that it is not deterministic,
>>> since calling it with the same click instance multiple times will return
>>> different keys. For example a call at 12:01:59.950 and a call at
>>> 12:02:00.050 with the same click instance will return two different 
>>> keys:
>>>
>>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name
>>> 2022-04-07T12:0

Re: How to proper hashCode() for keys.

2022-02-14 Thread John Smith
Because I want to group them for the last X minutes. In this case last 1
minute.

On Mon, Feb 14, 2022 at 10:01 AM Ali Bahadir Zeybek 
wrote:

> Hello John,
>
> Then may I ask you why you need to use a time attribute as part of your
> key?
> Why not just key by the fields like `mydomain.com` and `some-article` in
> your
> example and use only window operator for grouping elements based on time?
>
> Sincerely,
>
> Ali
>
> On Mon, Feb 14, 2022 at 3:55 PM John Smith  wrote:
>
>> Hi, thanks. As previously mentioned, processing time. So I
>> regardless when the event was generated I want to count all events I have
>> right now (as soon as they are seen by the flink job).
>>
>> On Mon, Feb 14, 2022 at 4:16 AM Ali Bahadir Zeybek 
>> wrote:
>>
>>> Hello John,
>>>
>>> Currently you are grouping the elements two times based on some time
>>> attribute, one while keying - with event time - and one while windowing
>>> - with
>>> processing time. Therefore, the windowing mechanism produces a new window
>>> computation when you see an element with the same key but arrived later
>>> from
>>> the previous window start and end timestamps. Can you please clarify with
>>> which notion of time you would like to handle the stream of data?
>>>
>>> Sincerely,
>>>
>>> Ali
>>>
>>> On Fri, Feb 11, 2022 at 6:43 PM John Smith 
>>> wrote:
>>>
 Ok I used the method suggested by Ali. The error is gone. But now I see
 multiple counts emitted for the same key...

 DataStream slStream = env.fromSource(kafkaSource, 
 WatermarkStrategy.noWatermarks(), "Kafka Source")
 .uid(kafkaTopic).name(kafkaTopic)
 .setParallelism(kafkaParallelism)
 .flatMap(new MapToMyEvent("my-event", windowSizeMins, "message")) 
 <-- Timestamp in GMT created here rounded to the closest minute down.
 .uid("map-json-logs").name("map-json-logs");

 slStream.keyBy(new MinutesKeySelector())
 
 .window(TumblingProcessingTimeWindows.of(Time.minutes(windowSizeMins))) 
 < Tumbling window of 1 minute.



 So below you will see a new count was emitted at 16:51 and 16:55

 {"countId":"2022-02-11T16:50:00Z|mydomain.com
 |/some-article","countDateTime":"2022-02-11T16:50:00Z","domain":"
 mydomain.com","uri":"/some-article","count":3542}
 -
 {"countId":"2022-02-11T16:51:00Z|mydomain.com
 |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
 mydomain.com","uri":"/some-article","count":16503}
 {"countId":"2022-02-11T16:51:00Z|mydomain.com
 |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
 mydomain.com","uri":"/some-article","count":70}
 -

 {"countId":"2022-02-11T16:52:00Z|mydomain.com
 |/some-article","countDateTime":"2022-02-11T16:52:00Z","domain":"
 mydomain.com","uri":"/some-article","count":16037}
 {"countId":"2022-02-11T16:53:00Z|mydomain.com
 |/some-article","countDateTime":"2022-02-11T16:53:00Z","domain":"
 mydomain.com","uri":"/some-article","count":18679}
 {"countId":"2022-02-11T16:54:00Z|mydomain.com
 |/some-article","countDateTime":"2022-02-11T16:54:00Z","domain":"
 mydomain.com","uri":"/some-article","count":17697}
 -

 {"countId":"2022-02-11T16:55:00Z|mydomain.com
 |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
 mydomain.com","uri":"/some-article","count":18066}
 {"countId":"2022-02-11T16:55:00Z|mydomain.com
 |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
 mydomain.com","uri":"/some-article","count":58}
 -
 {"countId":"2022-02-11T16:56:00Z|mydomain.com
 |/some-article","countDateTime":"2022-02-11T16:56:00Z","domain":"
 mydomain.com","uri":"/some-article","count":17489}




 On Mon, Feb 7, 2022 at 12:44 PM John Smith 
 wrote:

> Ok I think Ali's solution makes the most sense to me. I'll try it and
> let you know.
>
> On Mon, Feb 7, 2022 at 11:44 AM Jing Ge  wrote:
>
>> Hi John,
>>
>> your getKey() implementation shows that it is not deterministic,
>> since calling it with the same click instance multiple times will return
>> different keys. For example a call at 12:01:59.950 and a call at
>> 12:02:00.050 with the same click instance will return two different keys:
>>
>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name
>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name
>>
>> best regards
>> Jing
>>
>> On Mon, Feb 7, 2022 at 5:07 PM John Smith 
>> wrote:
>>
>>> Maybe there's a misunderstanding. But basically I want to
>>> do clickstream count for a given "url" and for simplicity and accuracy 
>>> of
>>> the count base it on processing time (event time doesn't matter as long 
>>> as
>>> I get a total of clicks at that given processing time)
>>>
>>> So regardless of the event time. I wa

Re: How to proper hashCode() for keys.

2022-02-14 Thread Ali Bahadir Zeybek
Hello John,

Then may I ask you why you need to use a time attribute as part of your key?
Why not just key by the fields like `mydomain.com` and `some-article` in
your
example and use only window operator for grouping elements based on time?

Sincerely,

Ali

On Mon, Feb 14, 2022 at 3:55 PM John Smith  wrote:

> Hi, thanks. As previously mentioned, processing time. So I regardless when
> the event was generated I want to count all events I have right now (as
> soon as they are seen by the flink job).
>
> On Mon, Feb 14, 2022 at 4:16 AM Ali Bahadir Zeybek 
> wrote:
>
>> Hello John,
>>
>> Currently you are grouping the elements two times based on some time
>> attribute, one while keying - with event time - and one while windowing -
>> with
>> processing time. Therefore, the windowing mechanism produces a new window
>> computation when you see an element with the same key but arrived later
>> from
>> the previous window start and end timestamps. Can you please clarify with
>> which notion of time you would like to handle the stream of data?
>>
>> Sincerely,
>>
>> Ali
>>
>> On Fri, Feb 11, 2022 at 6:43 PM John Smith 
>> wrote:
>>
>>> Ok I used the method suggested by Ali. The error is gone. But now I see
>>> multiple counts emitted for the same key...
>>>
>>> DataStream slStream = env.fromSource(kafkaSource, 
>>> WatermarkStrategy.noWatermarks(), "Kafka Source")
>>> .uid(kafkaTopic).name(kafkaTopic)
>>> .setParallelism(kafkaParallelism)
>>> .flatMap(new MapToMyEvent("my-event", windowSizeMins, "message")) 
>>> <-- Timestamp in GMT created here rounded to the closest minute down.
>>> .uid("map-json-logs").name("map-json-logs");
>>>
>>> slStream.keyBy(new MinutesKeySelector())
>>> 
>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(windowSizeMins))) 
>>> < Tumbling window of 1 minute.
>>>
>>>
>>>
>>> So below you will see a new count was emitted at 16:51 and 16:55
>>>
>>> {"countId":"2022-02-11T16:50:00Z|mydomain.com
>>> |/some-article","countDateTime":"2022-02-11T16:50:00Z","domain":"
>>> mydomain.com","uri":"/some-article","count":3542}
>>> -
>>> {"countId":"2022-02-11T16:51:00Z|mydomain.com
>>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
>>> mydomain.com","uri":"/some-article","count":16503}
>>> {"countId":"2022-02-11T16:51:00Z|mydomain.com
>>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
>>> mydomain.com","uri":"/some-article","count":70}
>>> -
>>>
>>> {"countId":"2022-02-11T16:52:00Z|mydomain.com
>>> |/some-article","countDateTime":"2022-02-11T16:52:00Z","domain":"
>>> mydomain.com","uri":"/some-article","count":16037}
>>> {"countId":"2022-02-11T16:53:00Z|mydomain.com
>>> |/some-article","countDateTime":"2022-02-11T16:53:00Z","domain":"
>>> mydomain.com","uri":"/some-article","count":18679}
>>> {"countId":"2022-02-11T16:54:00Z|mydomain.com
>>> |/some-article","countDateTime":"2022-02-11T16:54:00Z","domain":"
>>> mydomain.com","uri":"/some-article","count":17697}
>>> -
>>>
>>> {"countId":"2022-02-11T16:55:00Z|mydomain.com
>>> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
>>> mydomain.com","uri":"/some-article","count":18066}
>>> {"countId":"2022-02-11T16:55:00Z|mydomain.com
>>> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
>>> mydomain.com","uri":"/some-article","count":58}
>>> -
>>> {"countId":"2022-02-11T16:56:00Z|mydomain.com
>>> |/some-article","countDateTime":"2022-02-11T16:56:00Z","domain":"
>>> mydomain.com","uri":"/some-article","count":17489}
>>>
>>>
>>>
>>>
>>> On Mon, Feb 7, 2022 at 12:44 PM John Smith 
>>> wrote:
>>>
 Ok I think Ali's solution makes the most sense to me. I'll try it and
 let you know.

 On Mon, Feb 7, 2022 at 11:44 AM Jing Ge  wrote:

> Hi John,
>
> your getKey() implementation shows that it is not deterministic, since
> calling it with the same click instance multiple times will return
> different keys. For example a call at 12:01:59.950 and a call at
> 12:02:00.050 with the same click instance will return two different keys:
>
> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name
> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name
>
> best regards
> Jing
>
> On Mon, Feb 7, 2022 at 5:07 PM John Smith 
> wrote:
>
>> Maybe there's a misunderstanding. But basically I want to
>> do clickstream count for a given "url" and for simplicity and accuracy of
>> the count base it on processing time (event time doesn't matter as long 
>> as
>> I get a total of clicks at that given processing time)
>>
>> So regardless of the event time. I want all clicks for the current
>> processing time rounded to the minute per link.
>>
>> So, if now was 2022-04-07T12:01:00.000Z
>>
>> Then I would want the following result...
>>
>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10
>> 2022-04-07

Re: How to proper hashCode() for keys.

2022-02-14 Thread John Smith
Hi, thanks. As previously mentioned, processing time. So I regardless when
the event was generated I want to count all events I have right now (as
soon as they are seen by the flink job).

On Mon, Feb 14, 2022 at 4:16 AM Ali Bahadir Zeybek 
wrote:

> Hello John,
>
> Currently you are grouping the elements two times based on some time
> attribute, one while keying - with event time - and one while windowing -
> with
> processing time. Therefore, the windowing mechanism produces a new window
> computation when you see an element with the same key but arrived later
> from
> the previous window start and end timestamps. Can you please clarify with
> which notion of time you would like to handle the stream of data?
>
> Sincerely,
>
> Ali
>
> On Fri, Feb 11, 2022 at 6:43 PM John Smith  wrote:
>
>> Ok I used the method suggested by Ali. The error is gone. But now I see
>> multiple counts emitted for the same key...
>>
>> DataStream slStream = env.fromSource(kafkaSource, 
>> WatermarkStrategy.noWatermarks(), "Kafka Source")
>> .uid(kafkaTopic).name(kafkaTopic)
>> .setParallelism(kafkaParallelism)
>> .flatMap(new MapToMyEvent("my-event", windowSizeMins, "message")) 
>> <-- Timestamp in GMT created here rounded to the closest minute down.
>> .uid("map-json-logs").name("map-json-logs");
>>
>> slStream.keyBy(new MinutesKeySelector())
>> 
>> .window(TumblingProcessingTimeWindows.of(Time.minutes(windowSizeMins))) 
>> < Tumbling window of 1 minute.
>>
>>
>>
>> So below you will see a new count was emitted at 16:51 and 16:55
>>
>> {"countId":"2022-02-11T16:50:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:50:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":3542}
>> -
>> {"countId":"2022-02-11T16:51:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":16503}
>> {"countId":"2022-02-11T16:51:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":70}
>> -
>>
>> {"countId":"2022-02-11T16:52:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:52:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":16037}
>> {"countId":"2022-02-11T16:53:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:53:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":18679}
>> {"countId":"2022-02-11T16:54:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:54:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":17697}
>> -
>>
>> {"countId":"2022-02-11T16:55:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":18066}
>> {"countId":"2022-02-11T16:55:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":58}
>> -
>> {"countId":"2022-02-11T16:56:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:56:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":17489}
>>
>>
>>
>>
>> On Mon, Feb 7, 2022 at 12:44 PM John Smith 
>> wrote:
>>
>>> Ok I think Ali's solution makes the most sense to me. I'll try it and
>>> let you know.
>>>
>>> On Mon, Feb 7, 2022 at 11:44 AM Jing Ge  wrote:
>>>
 Hi John,

 your getKey() implementation shows that it is not deterministic, since
 calling it with the same click instance multiple times will return
 different keys. For example a call at 12:01:59.950 and a call at
 12:02:00.050 with the same click instance will return two different keys:

 2022-04-07T12:01:00.000Z|cnn.com|some-article-name
 2022-04-07T12:02:00.000Z|cnn.com|some-article-name

 best regards
 Jing

 On Mon, Feb 7, 2022 at 5:07 PM John Smith 
 wrote:

> Maybe there's a misunderstanding. But basically I want to
> do clickstream count for a given "url" and for simplicity and accuracy of
> the count base it on processing time (event time doesn't matter as long as
> I get a total of clicks at that given processing time)
>
> So regardless of the event time. I want all clicks for the current
> processing time rounded to the minute per link.
>
> So, if now was 2022-04-07T12:01:00.000Z
>
> Then I would want the following result...
>
> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10
> 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2
> 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15
> 
> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30
> 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1
> 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10
> And so on...
>
> @Override
> public MyEventCountKey getKey(final MyEvent click) throws Exception
>

Re: How to proper hashCode() for keys.

2022-02-14 Thread Ali Bahadir Zeybek
Hello John,

Currently you are grouping the elements two times based on some time
attribute, one while keying - with event time - and one while windowing -
with
processing time. Therefore, the windowing mechanism produces a new window
computation when you see an element with the same key but arrived later from
the previous window start and end timestamps. Can you please clarify with
which notion of time you would like to handle the stream of data?

Sincerely,

Ali

On Fri, Feb 11, 2022 at 6:43 PM John Smith  wrote:

> Ok I used the method suggested by Ali. The error is gone. But now I see
> multiple counts emitted for the same key...
>
> DataStream slStream = env.fromSource(kafkaSource, 
> WatermarkStrategy.noWatermarks(), "Kafka Source")
> .uid(kafkaTopic).name(kafkaTopic)
> .setParallelism(kafkaParallelism)
> .flatMap(new MapToMyEvent("my-event", windowSizeMins, "message")) 
> <-- Timestamp in GMT created here rounded to the closest minute down.
> .uid("map-json-logs").name("map-json-logs");
>
> slStream.keyBy(new MinutesKeySelector())
> 
> .window(TumblingProcessingTimeWindows.of(Time.minutes(windowSizeMins))) < 
> Tumbling window of 1 minute.
>
>
>
> So below you will see a new count was emitted at 16:51 and 16:55
>
> {"countId":"2022-02-11T16:50:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:50:00Z","domain":"
> mydomain.com","uri":"/some-article","count":3542}
> -
> {"countId":"2022-02-11T16:51:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
> mydomain.com","uri":"/some-article","count":16503}
> {"countId":"2022-02-11T16:51:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
> mydomain.com","uri":"/some-article","count":70}
> -
>
> {"countId":"2022-02-11T16:52:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:52:00Z","domain":"
> mydomain.com","uri":"/some-article","count":16037}
> {"countId":"2022-02-11T16:53:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:53:00Z","domain":"
> mydomain.com","uri":"/some-article","count":18679}
> {"countId":"2022-02-11T16:54:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:54:00Z","domain":"
> mydomain.com","uri":"/some-article","count":17697}
> -
>
> {"countId":"2022-02-11T16:55:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
> mydomain.com","uri":"/some-article","count":18066}
> {"countId":"2022-02-11T16:55:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
> mydomain.com","uri":"/some-article","count":58}
> -
> {"countId":"2022-02-11T16:56:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:56:00Z","domain":"
> mydomain.com","uri":"/some-article","count":17489}
>
>
>
>
> On Mon, Feb 7, 2022 at 12:44 PM John Smith  wrote:
>
>> Ok I think Ali's solution makes the most sense to me. I'll try it and let
>> you know.
>>
>> On Mon, Feb 7, 2022 at 11:44 AM Jing Ge  wrote:
>>
>>> Hi John,
>>>
>>> your getKey() implementation shows that it is not deterministic, since
>>> calling it with the same click instance multiple times will return
>>> different keys. For example a call at 12:01:59.950 and a call at
>>> 12:02:00.050 with the same click instance will return two different keys:
>>>
>>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name
>>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name
>>>
>>> best regards
>>> Jing
>>>
>>> On Mon, Feb 7, 2022 at 5:07 PM John Smith 
>>> wrote:
>>>
 Maybe there's a misunderstanding. But basically I want to
 do clickstream count for a given "url" and for simplicity and accuracy of
 the count base it on processing time (event time doesn't matter as long as
 I get a total of clicks at that given processing time)

 So regardless of the event time. I want all clicks for the current
 processing time rounded to the minute per link.

 So, if now was 2022-04-07T12:01:00.000Z

 Then I would want the following result...

 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10
 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2
 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15
 
 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30
 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1
 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10
 And so on...

 @Override
 public MyEventCountKey getKey(final MyEvent click) throws Exception
 {
 MyEventCountKey key = new MyEventCountKey(
 Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")),
 ChronoField.MINUTE_OF_HOUR, windowSizeMins)).toString(),
 click.getDomain(), // cnn.com
 click.getPath(), // /some-article-name
 );
 return key;
 }



 On Mon, Feb 7, 2022 at 10:48 AM David Morávek  wrote:

> The

Re: How to proper hashCode() for keys.

2022-02-11 Thread John Smith
Ok I used the method suggested by Ali. The error is gone. But now I see
multiple counts emitted for the same key...

DataStream slStream = env.fromSource(kafkaSource,
WatermarkStrategy.noWatermarks(), "Kafka Source")
.uid(kafkaTopic).name(kafkaTopic)
.setParallelism(kafkaParallelism)
.flatMap(new MapToMyEvent("my-event", windowSizeMins,
"message")) <-- Timestamp in GMT created here rounded to the
closest minute down.
.uid("map-json-logs").name("map-json-logs");

slStream.keyBy(new MinutesKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.minutes(windowSizeMins)))
< Tumbling window of 1 minute.



So below you will see a new count was emitted at 16:51 and 16:55

{"countId":"2022-02-11T16:50:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:50:00Z","domain":"
mydomain.com","uri":"/some-article","count":3542}
-
{"countId":"2022-02-11T16:51:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
mydomain.com","uri":"/some-article","count":16503}
{"countId":"2022-02-11T16:51:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
mydomain.com","uri":"/some-article","count":70}
-

{"countId":"2022-02-11T16:52:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:52:00Z","domain":"
mydomain.com","uri":"/some-article","count":16037}
{"countId":"2022-02-11T16:53:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:53:00Z","domain":"
mydomain.com","uri":"/some-article","count":18679}
{"countId":"2022-02-11T16:54:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:54:00Z","domain":"
mydomain.com","uri":"/some-article","count":17697}
-

{"countId":"2022-02-11T16:55:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
mydomain.com","uri":"/some-article","count":18066}
{"countId":"2022-02-11T16:55:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
mydomain.com","uri":"/some-article","count":58}
-
{"countId":"2022-02-11T16:56:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:56:00Z","domain":"
mydomain.com","uri":"/some-article","count":17489}




On Mon, Feb 7, 2022 at 12:44 PM John Smith  wrote:

> Ok I think Ali's solution makes the most sense to me. I'll try it and let
> you know.
>
> On Mon, Feb 7, 2022 at 11:44 AM Jing Ge  wrote:
>
>> Hi John,
>>
>> your getKey() implementation shows that it is not deterministic, since
>> calling it with the same click instance multiple times will return
>> different keys. For example a call at 12:01:59.950 and a call at
>> 12:02:00.050 with the same click instance will return two different keys:
>>
>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name
>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name
>>
>> best regards
>> Jing
>>
>> On Mon, Feb 7, 2022 at 5:07 PM John Smith  wrote:
>>
>>> Maybe there's a misunderstanding. But basically I want to do clickstream
>>> count for a given "url" and for simplicity and accuracy of the count base
>>> it on processing time (event time doesn't matter as long as I get a total
>>> of clicks at that given processing time)
>>>
>>> So regardless of the event time. I want all clicks for the current
>>> processing time rounded to the minute per link.
>>>
>>> So, if now was 2022-04-07T12:01:00.000Z
>>>
>>> Then I would want the following result...
>>>
>>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10
>>> 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2
>>> 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15
>>> 
>>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30
>>> 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1
>>> 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10
>>> And so on...
>>>
>>> @Override
>>> public MyEventCountKey getKey(final MyEvent click) throws Exception
>>> {
>>> MyEventCountKey key = new MyEventCountKey(
>>> Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")),
>>> ChronoField.MINUTE_OF_HOUR, windowSizeMins)).toString(),
>>> click.getDomain(), // cnn.com
>>> click.getPath(), // /some-article-name
>>> );
>>> return key;
>>> }
>>>
>>>
>>>
>>> On Mon, Feb 7, 2022 at 10:48 AM David Morávek  wrote:
>>>
 The key selector works.


 No it does not ;) It depends on the system time so it's not
 deterministic (you can get different keys for the very same element).

 How do you key a count based on the time. I have taken this from
> samples online.
>

 This is what the windowing is for. You basically want to group /
 combine elements per key and event time window [1].

 [1]
 https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/

 Best,
 D.

 On Mon, Feb 7, 2022 at 3:44 PM John Smith 
 wrote:

> The key selector works. It only causes an issue if there too many keys

Re: How to proper hashCode() for keys.

2022-02-07 Thread John Smith
Ok I think Ali's solution makes the most sense to me. I'll try it and let
you know.

On Mon, Feb 7, 2022 at 11:44 AM Jing Ge  wrote:

> Hi John,
>
> your getKey() implementation shows that it is not deterministic, since
> calling it with the same click instance multiple times will return
> different keys. For example a call at 12:01:59.950 and a call at
> 12:02:00.050 with the same click instance will return two different keys:
>
> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name
> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name
>
> best regards
> Jing
>
> On Mon, Feb 7, 2022 at 5:07 PM John Smith  wrote:
>
>> Maybe there's a misunderstanding. But basically I want to do clickstream
>> count for a given "url" and for simplicity and accuracy of the count base
>> it on processing time (event time doesn't matter as long as I get a total
>> of clicks at that given processing time)
>>
>> So regardless of the event time. I want all clicks for the current
>> processing time rounded to the minute per link.
>>
>> So, if now was 2022-04-07T12:01:00.000Z
>>
>> Then I would want the following result...
>>
>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10
>> 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2
>> 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15
>> 
>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30
>> 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1
>> 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10
>> And so on...
>>
>> @Override
>> public MyEventCountKey getKey(final MyEvent click) throws Exception
>> {
>> MyEventCountKey key = new MyEventCountKey(
>> Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")),
>> ChronoField.MINUTE_OF_HOUR, windowSizeMins)).toString(),
>> click.getDomain(), // cnn.com
>> click.getPath(), // /some-article-name
>> );
>> return key;
>> }
>>
>>
>>
>> On Mon, Feb 7, 2022 at 10:48 AM David Morávek  wrote:
>>
>>> The key selector works.
>>>
>>>
>>> No it does not ;) It depends on the system time so it's not
>>> deterministic (you can get different keys for the very same element).
>>>
>>> How do you key a count based on the time. I have taken this from samples
 online.

>>>
>>> This is what the windowing is for. You basically want to group / combine
>>> elements per key and event time window [1].
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/
>>>
>>> Best,
>>> D.
>>>
>>> On Mon, Feb 7, 2022 at 3:44 PM John Smith 
>>> wrote:
>>>
 The key selector works. It only causes an issue if there too many keys
 produced in one shot. For example of 100 "same" keys are produced for that
 1 minutes it's ok. But if 101 are produced the error happens.


 If you look at the reproducer at least that's what's hapenning

 How do you key a count based on the time. I have taken this from
 samples online.

 The key is that particular time for that particular URL path.

 So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00

 On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, 
 wrote:

> Your Key selector doesn't need to implement hashCode, but given the
> same object it has to return the same key.
> In your reproducer the returned key will have different timestamps,
> and since the timestamp is included in the hashCode, they will be 
> different
> each time.
>
> On 07/02/2022 14:50, John Smith wrote:
>
> I don't get it? I provided the reproducer. I implemented the interface
> to Key selector it needs hashcode and equals as well?
>
> I'm attempting to do click stream. So the key is based on processing
> date/time rounded to the minute + domain name + path
>
> So these should be valid below?
>
> 2022-01-01T10:02:00 + cnn.com + /article1
> 2022-01-01T10:02:00 + cnn.com + /article1
> 2022-01-01T10:02:00 + cnn.com + /article1
>
> 2022-01-01T10:02:00 + cnn.com + /article2
>
> 2022-01-01T10:03:00 + cnn.com + /article1
> 2022-01-01T10:03:00 + cnn.com + /article1
>
> 2022-01-01T10:03:00 + cnn.com + /article3
> 2022-01-01T10:03:00 + cnn.com + /article3
>
> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, 
> wrote:
>
>> Don't KeySelectors also need to be deterministic?
>>
>> * The {@link KeySelector} allows to use deterministic objects for 
>> operations such as reduce,* reduceGroup, join, coGroup, etc. *If invoked 
>> multiple times on the same object, the returned key*** must be the same.*
>>
>>
>> On 04/02/2022 18:25, John Smith wrote:
>>
>> Hi Francesco,  here is the reproducer:
>> https://github.com/javadevmtl/flink-key-reproducer
>>
>> So, essentially it looks like when there's a high influx of records
>> produced from the source that the Exception is thrown.
>>
>> The key is ge

Re: How to proper hashCode() for keys.

2022-02-07 Thread Jing Ge
Hi John,

your getKey() implementation shows that it is not deterministic, since
calling it with the same click instance multiple times will return
different keys. For example a call at 12:01:59.950 and a call at
12:02:00.050 with the same click instance will return two different keys:

2022-04-07T12:01:00.000Z|cnn.com|some-article-name
2022-04-07T12:02:00.000Z|cnn.com|some-article-name

best regards
Jing

On Mon, Feb 7, 2022 at 5:07 PM John Smith  wrote:

> Maybe there's a misunderstanding. But basically I want to do clickstream
> count for a given "url" and for simplicity and accuracy of the count base
> it on processing time (event time doesn't matter as long as I get a total
> of clicks at that given processing time)
>
> So regardless of the event time. I want all clicks for the current
> processing time rounded to the minute per link.
>
> So, if now was 2022-04-07T12:01:00.000Z
>
> Then I would want the following result...
>
> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10
> 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2
> 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15
> 
> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30
> 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1
> 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10
> And so on...
>
> @Override
> public MyEventCountKey getKey(final MyEvent click) throws Exception
> {
> MyEventCountKey key = new MyEventCountKey(
> Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")),
> ChronoField.MINUTE_OF_HOUR, windowSizeMins)).toString(),
> click.getDomain(), // cnn.com
> click.getPath(), // /some-article-name
> );
> return key;
> }
>
>
>
> On Mon, Feb 7, 2022 at 10:48 AM David Morávek  wrote:
>
>> The key selector works.
>>
>>
>> No it does not ;) It depends on the system time so it's not deterministic
>> (you can get different keys for the very same element).
>>
>> How do you key a count based on the time. I have taken this from samples
>>> online.
>>>
>>
>> This is what the windowing is for. You basically want to group / combine
>> elements per key and event time window [1].
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/
>>
>> Best,
>> D.
>>
>> On Mon, Feb 7, 2022 at 3:44 PM John Smith  wrote:
>>
>>> The key selector works. It only causes an issue if there too many keys
>>> produced in one shot. For example of 100 "same" keys are produced for that
>>> 1 minutes it's ok. But if 101 are produced the error happens.
>>>
>>>
>>> If you look at the reproducer at least that's what's hapenning
>>>
>>> How do you key a count based on the time. I have taken this from samples
>>> online.
>>>
>>> The key is that particular time for that particular URL path.
>>>
>>> So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00
>>>
>>> On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, 
>>> wrote:
>>>
 Your Key selector doesn't need to implement hashCode, but given the
 same object it has to return the same key.
 In your reproducer the returned key will have different timestamps, and
 since the timestamp is included in the hashCode, they will be different
 each time.

 On 07/02/2022 14:50, John Smith wrote:

 I don't get it? I provided the reproducer. I implemented the interface
 to Key selector it needs hashcode and equals as well?

 I'm attempting to do click stream. So the key is based on processing
 date/time rounded to the minute + domain name + path

 So these should be valid below?

 2022-01-01T10:02:00 + cnn.com + /article1
 2022-01-01T10:02:00 + cnn.com + /article1
 2022-01-01T10:02:00 + cnn.com + /article1

 2022-01-01T10:02:00 + cnn.com + /article2

 2022-01-01T10:03:00 + cnn.com + /article1
 2022-01-01T10:03:00 + cnn.com + /article1

 2022-01-01T10:03:00 + cnn.com + /article3
 2022-01-01T10:03:00 + cnn.com + /article3

 On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, 
 wrote:

> Don't KeySelectors also need to be deterministic?
>
> * The {@link KeySelector} allows to use deterministic objects for 
> operations such as reduce,* reduceGroup, join, coGroup, etc. *If invoked 
> multiple times on the same object, the returned key*** must be the same.*
>
>
> On 04/02/2022 18:25, John Smith wrote:
>
> Hi Francesco,  here is the reproducer:
> https://github.com/javadevmtl/flink-key-reproducer
>
> So, essentially it looks like when there's a high influx of records
> produced from the source that the Exception is thrown.
>
> The key is generated by 3 values: date/time rounded to the minute and
> 2 strings.
> So you will see keys as follows...
> 2022-02-04T17:20:00Z|foo|bar
> 2022-02-04T17:21:00Z|foo|bar
> 2022-02-04T17:22:00Z|foo|bar
>
> The reproducer has a custom source that basically produces a

Re: How to proper hashCode() for keys.

2022-02-07 Thread Ali Bahadir Zeybek
Hello John,

During the lifecycle of the execution for a given event, the key information
is not passed in between different operators, but they are computed based on
the given key selector, every time an (keyed)operator sees the event.
Therefore, the same event, within the same pipeline, could be assigned a
different key while moving along the graph of operators. This part in your
key
selector is not deterministic since it depends on the time the key selector
function is executed. My suggestion would be to materialise the key as an
additional field to your event at the beginning of the pipeline and then use
that field as the key.

Sincerely,

Ali

On Mon, Feb 7, 2022 at 7:06 PM John Smith  wrote:

> Maybe there's a misunderstanding. But basically I want to do clickstream
> count for a given "url" and for simplicity and accuracy of the count base
> it on processing time (event time doesn't matter as long as I get a total
> of clicks at that given processing time)
>
> So regardless of the event time. I want all clicks for the current
> processing time rounded to the minute per link.
>
> So, if now was 2022-04-07T12:01:00.000Z
>
> Then I would want the following result...
>
> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10
> 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2
> 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15
> 
> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30
> 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1
> 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10
> And so on...
>
> @Override
> public MyEventCountKey getKey(final MyEvent click) throws Exception
> {
> MyEventCountKey key = new MyEventCountKey(
> Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")),
> ChronoField.MINUTE_OF_HOUR, windowSizeMins)).toString(),
> click.getDomain(), // cnn.com
> click.getPath(), // /some-article-name
> );
> return key;
> }
>
>
>
> On Mon, Feb 7, 2022 at 10:48 AM David Morávek  wrote:
>
>> The key selector works.
>>
>>
>> No it does not ;) It depends on the system time so it's not deterministic
>> (you can get different keys for the very same element).
>>
>> How do you key a count based on the time. I have taken this from samples
>>> online.
>>>
>>
>> This is what the windowing is for. You basically want to group / combine
>> elements per key and event time window [1].
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/
>>
>> Best,
>> D.
>>
>> On Mon, Feb 7, 2022 at 3:44 PM John Smith  wrote:
>>
>>> The key selector works. It only causes an issue if there too many keys
>>> produced in one shot. For example of 100 "same" keys are produced for that
>>> 1 minutes it's ok. But if 101 are produced the error happens.
>>>
>>>
>>> If you look at the reproducer at least that's what's hapenning
>>>
>>> How do you key a count based on the time. I have taken this from samples
>>> online.
>>>
>>> The key is that particular time for that particular URL path.
>>>
>>> So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00
>>>
>>> On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, 
>>> wrote:
>>>
 Your Key selector doesn't need to implement hashCode, but given the
 same object it has to return the same key.
 In your reproducer the returned key will have different timestamps, and
 since the timestamp is included in the hashCode, they will be different
 each time.

 On 07/02/2022 14:50, John Smith wrote:

 I don't get it? I provided the reproducer. I implemented the interface
 to Key selector it needs hashcode and equals as well?

 I'm attempting to do click stream. So the key is based on processing
 date/time rounded to the minute + domain name + path

 So these should be valid below?

 2022-01-01T10:02:00 + cnn.com + /article1
 2022-01-01T10:02:00 + cnn.com + /article1
 2022-01-01T10:02:00 + cnn.com + /article1

 2022-01-01T10:02:00 + cnn.com + /article2

 2022-01-01T10:03:00 + cnn.com + /article1
 2022-01-01T10:03:00 + cnn.com + /article1

 2022-01-01T10:03:00 + cnn.com + /article3
 2022-01-01T10:03:00 + cnn.com + /article3

 On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, 
 wrote:

> Don't KeySelectors also need to be deterministic?
>
> * The {@link KeySelector} allows to use deterministic objects for 
> operations such as reduce,* reduceGroup, join, coGroup, etc. *If invoked 
> multiple times on the same object, the returned key*** must be the same.*
>
>
> On 04/02/2022 18:25, John Smith wrote:
>
> Hi Francesco,  here is the reproducer:
> https://github.com/javadevmtl/flink-key-reproducer
>
> So, essentially it looks like when there's a high influx of records
> produced from the source that the Exception is thrown.
>
> The key is generated by 3 values: date/time rounded to the

Re: How to proper hashCode() for keys.

2022-02-07 Thread John Smith
Maybe there's a misunderstanding. But basically I want to do clickstream
count for a given "url" and for simplicity and accuracy of the count base
it on processing time (event time doesn't matter as long as I get a total
of clicks at that given processing time)

So regardless of the event time. I want all clicks for the current
processing time rounded to the minute per link.

So, if now was 2022-04-07T12:01:00.000Z

Then I would want the following result...

2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10
2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2
2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15

2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30
2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1
2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10
And so on...

@Override
public MyEventCountKey getKey(final MyEvent click) throws Exception
{
MyEventCountKey key = new MyEventCountKey(
Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")), ChronoField.
MINUTE_OF_HOUR, windowSizeMins)).toString(),
click.getDomain(), // cnn.com
click.getPath(), // /some-article-name
);
return key;
}



On Mon, Feb 7, 2022 at 10:48 AM David Morávek  wrote:

> The key selector works.
>
>
> No it does not ;) It depends on the system time so it's not deterministic
> (you can get different keys for the very same element).
>
> How do you key a count based on the time. I have taken this from samples
>> online.
>>
>
> This is what the windowing is for. You basically want to group / combine
> elements per key and event time window [1].
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/
>
> Best,
> D.
>
> On Mon, Feb 7, 2022 at 3:44 PM John Smith  wrote:
>
>> The key selector works. It only causes an issue if there too many keys
>> produced in one shot. For example of 100 "same" keys are produced for that
>> 1 minutes it's ok. But if 101 are produced the error happens.
>>
>>
>> If you look at the reproducer at least that's what's hapenning
>>
>> How do you key a count based on the time. I have taken this from samples
>> online.
>>
>> The key is that particular time for that particular URL path.
>>
>> So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00
>>
>> On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, 
>> wrote:
>>
>>> Your Key selector doesn't need to implement hashCode, but given the same
>>> object it has to return the same key.
>>> In your reproducer the returned key will have different timestamps, and
>>> since the timestamp is included in the hashCode, they will be different
>>> each time.
>>>
>>> On 07/02/2022 14:50, John Smith wrote:
>>>
>>> I don't get it? I provided the reproducer. I implemented the interface
>>> to Key selector it needs hashcode and equals as well?
>>>
>>> I'm attempting to do click stream. So the key is based on processing
>>> date/time rounded to the minute + domain name + path
>>>
>>> So these should be valid below?
>>>
>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>>
>>> 2022-01-01T10:02:00 + cnn.com + /article2
>>>
>>> 2022-01-01T10:03:00 + cnn.com + /article1
>>> 2022-01-01T10:03:00 + cnn.com + /article1
>>>
>>> 2022-01-01T10:03:00 + cnn.com + /article3
>>> 2022-01-01T10:03:00 + cnn.com + /article3
>>>
>>> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, 
>>> wrote:
>>>
 Don't KeySelectors also need to be deterministic?

 * The {@link KeySelector} allows to use deterministic objects for 
 operations such as reduce,* reduceGroup, join, coGroup, etc. *If invoked 
 multiple times on the same object, the returned key*** must be the same.*


 On 04/02/2022 18:25, John Smith wrote:

 Hi Francesco,  here is the reproducer:
 https://github.com/javadevmtl/flink-key-reproducer

 So, essentially it looks like when there's a high influx of records
 produced from the source that the Exception is thrown.

 The key is generated by 3 values: date/time rounded to the minute and 2
 strings.
 So you will see keys as follows...
 2022-02-04T17:20:00Z|foo|bar
 2022-02-04T17:21:00Z|foo|bar
 2022-02-04T17:22:00Z|foo|bar

 The reproducer has a custom source that basically produces a record in
 a loop and sleeps for a specified period of milliseconds 100ms in this 
 case.
 The lower the sleep delay the faster records are produced the more
 chances the exception is thrown. With a 100ms delay it's always thrown.
 Setting a 2000 to 3000ms will guarantee it to work.
 The original job uses a Kafka Source so it should technically be able
 to handle even a couple thousand records per second.


 On Thu, 3 Feb 2022 at 16:41, John Smith  wrote:

> Ok it's not my data either. I think it may be a volume issue. I have
> managed to consistently reprod

Re: How to proper hashCode() for keys.

2022-02-07 Thread David Morávek
>
> The key selector works.


No it does not ;) It depends on the system time so it's not deterministic
(you can get different keys for the very same element).

How do you key a count based on the time. I have taken this from samples
> online.
>

This is what the windowing is for. You basically want to group / combine
elements per key and event time window [1].

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/

Best,
D.

On Mon, Feb 7, 2022 at 3:44 PM John Smith  wrote:

> The key selector works. It only causes an issue if there too many keys
> produced in one shot. For example of 100 "same" keys are produced for that
> 1 minutes it's ok. But if 101 are produced the error happens.
>
>
> If you look at the reproducer at least that's what's hapenning
>
> How do you key a count based on the time. I have taken this from samples
> online.
>
> The key is that particular time for that particular URL path.
>
> So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00
>
> On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, 
> wrote:
>
>> Your Key selector doesn't need to implement hashCode, but given the same
>> object it has to return the same key.
>> In your reproducer the returned key will have different timestamps, and
>> since the timestamp is included in the hashCode, they will be different
>> each time.
>>
>> On 07/02/2022 14:50, John Smith wrote:
>>
>> I don't get it? I provided the reproducer. I implemented the interface to
>> Key selector it needs hashcode and equals as well?
>>
>> I'm attempting to do click stream. So the key is based on processing
>> date/time rounded to the minute + domain name + path
>>
>> So these should be valid below?
>>
>> 2022-01-01T10:02:00 + cnn.com + /article1
>> 2022-01-01T10:02:00 + cnn.com + /article1
>> 2022-01-01T10:02:00 + cnn.com + /article1
>>
>> 2022-01-01T10:02:00 + cnn.com + /article2
>>
>> 2022-01-01T10:03:00 + cnn.com + /article1
>> 2022-01-01T10:03:00 + cnn.com + /article1
>>
>> 2022-01-01T10:03:00 + cnn.com + /article3
>> 2022-01-01T10:03:00 + cnn.com + /article3
>>
>> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, 
>> wrote:
>>
>>> Don't KeySelectors also need to be deterministic?
>>>
>>> * The {@link KeySelector} allows to use deterministic objects for 
>>> operations such as reduce,* reduceGroup, join, coGroup, etc. *If invoked 
>>> multiple times on the same object, the returned key*** must be the same.*
>>>
>>>
>>> On 04/02/2022 18:25, John Smith wrote:
>>>
>>> Hi Francesco,  here is the reproducer:
>>> https://github.com/javadevmtl/flink-key-reproducer
>>>
>>> So, essentially it looks like when there's a high influx of records
>>> produced from the source that the Exception is thrown.
>>>
>>> The key is generated by 3 values: date/time rounded to the minute and 2
>>> strings.
>>> So you will see keys as follows...
>>> 2022-02-04T17:20:00Z|foo|bar
>>> 2022-02-04T17:21:00Z|foo|bar
>>> 2022-02-04T17:22:00Z|foo|bar
>>>
>>> The reproducer has a custom source that basically produces a record in a
>>> loop and sleeps for a specified period of milliseconds 100ms in this case.
>>> The lower the sleep delay the faster records are produced the more
>>> chances the exception is thrown. With a 100ms delay it's always thrown.
>>> Setting a 2000 to 3000ms will guarantee it to work.
>>> The original job uses a Kafka Source so it should technically be able to
>>> handle even a couple thousand records per second.
>>>
>>>
>>> On Thu, 3 Feb 2022 at 16:41, John Smith  wrote:
>>>
 Ok it's not my data either. I think it may be a volume issue. I have
 managed to consistently reproduce the error. I'll upload a reproducer ASAP.



 On Thu, 3 Feb 2022 at 15:37, John Smith  wrote:

> Ok so I tried to create a reproducer but I couldn't reproduce it. But
> the actual job once in a while throws that error. So I'm wondering if 
> maybe
> one of the records that comes in is not valid, though I do validate prior
> to getting to the key and window operators.
>
> On Thu, 3 Feb 2022 at 14:32, John Smith 
> wrote:
>
>> Actually maybe not because with PrintSinkFunction it ran for a bit
>> and then it threw the error.
>>
>> On Thu, 3 Feb 2022 at 14:24, John Smith 
>> wrote:
>>
>>> Ok it may be the ElasticSearch connector causing the issue?
>>>
>>> If I use PrintSinkFunction then I get no error and my stats print as
>>> expected.
>>>
>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani <
>>> france...@ververica.com> wrote:
>>>
 Hi,
 your hash code and equals seems correct. Can you post a minimum
 stream pipeline reproducer using this class?

 FG

 On Tue, Feb 1, 2022 at 8:39 PM John Smith 
 wrote:

> Hi, getting java.lang.IllegalArgumentException: Key group 39 is
> not in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you

Re: How to proper hashCode() for keys.

2022-02-07 Thread John Smith
The key selector works. It only causes an issue if there too many keys
produced in one shot. For example of 100 "same" keys are produced for that
1 minutes it's ok. But if 101 are produced the error happens.


If you look at the reproducer at least that's what's hapenning

How do you key a count based on the time. I have taken this from samples
online.

The key is that particular time for that particular URL path.

So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00

On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, 
wrote:

> Your Key selector doesn't need to implement hashCode, but given the same
> object it has to return the same key.
> In your reproducer the returned key will have different timestamps, and
> since the timestamp is included in the hashCode, they will be different
> each time.
>
> On 07/02/2022 14:50, John Smith wrote:
>
> I don't get it? I provided the reproducer. I implemented the interface to
> Key selector it needs hashcode and equals as well?
>
> I'm attempting to do click stream. So the key is based on processing
> date/time rounded to the minute + domain name + path
>
> So these should be valid below?
>
> 2022-01-01T10:02:00 + cnn.com + /article1
> 2022-01-01T10:02:00 + cnn.com + /article1
> 2022-01-01T10:02:00 + cnn.com + /article1
>
> 2022-01-01T10:02:00 + cnn.com + /article2
>
> 2022-01-01T10:03:00 + cnn.com + /article1
> 2022-01-01T10:03:00 + cnn.com + /article1
>
> 2022-01-01T10:03:00 + cnn.com + /article3
> 2022-01-01T10:03:00 + cnn.com + /article3
>
> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, 
> wrote:
>
>> Don't KeySelectors also need to be deterministic?
>>
>> * The {@link KeySelector} allows to use deterministic objects for operations 
>> such as reduce,* reduceGroup, join, coGroup, etc. *If invoked multiple times 
>> on the same object, the returned key*** must be the same.*
>>
>>
>> On 04/02/2022 18:25, John Smith wrote:
>>
>> Hi Francesco,  here is the reproducer:
>> https://github.com/javadevmtl/flink-key-reproducer
>>
>> So, essentially it looks like when there's a high influx of records
>> produced from the source that the Exception is thrown.
>>
>> The key is generated by 3 values: date/time rounded to the minute and 2
>> strings.
>> So you will see keys as follows...
>> 2022-02-04T17:20:00Z|foo|bar
>> 2022-02-04T17:21:00Z|foo|bar
>> 2022-02-04T17:22:00Z|foo|bar
>>
>> The reproducer has a custom source that basically produces a record in a
>> loop and sleeps for a specified period of milliseconds 100ms in this case.
>> The lower the sleep delay the faster records are produced the more
>> chances the exception is thrown. With a 100ms delay it's always thrown.
>> Setting a 2000 to 3000ms will guarantee it to work.
>> The original job uses a Kafka Source so it should technically be able to
>> handle even a couple thousand records per second.
>>
>>
>> On Thu, 3 Feb 2022 at 16:41, John Smith  wrote:
>>
>>> Ok it's not my data either. I think it may be a volume issue. I have
>>> managed to consistently reproduce the error. I'll upload a reproducer ASAP.
>>>
>>>
>>>
>>> On Thu, 3 Feb 2022 at 15:37, John Smith  wrote:
>>>
 Ok so I tried to create a reproducer but I couldn't reproduce it. But
 the actual job once in a while throws that error. So I'm wondering if maybe
 one of the records that comes in is not valid, though I do validate prior
 to getting to the key and window operators.

 On Thu, 3 Feb 2022 at 14:32, John Smith  wrote:

> Actually maybe not because with PrintSinkFunction it ran for a bit and
> then it threw the error.
>
> On Thu, 3 Feb 2022 at 14:24, John Smith 
> wrote:
>
>> Ok it may be the ElasticSearch connector causing the issue?
>>
>> If I use PrintSinkFunction then I get no error and my stats print as
>> expected.
>>
>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani <
>> france...@ververica.com> wrote:
>>
>>> Hi,
>>> your hash code and equals seems correct. Can you post a minimum
>>> stream pipeline reproducer using this class?
>>>
>>> FG
>>>
>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith 
>>> wrote:
>>>
 Hi, getting java.lang.IllegalArgumentException: Key group 39 is not
 in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're 
 directly
 using low level state access APIs, this is most likely caused by
 non-deterministic shuffle key (hashCode and equals implementation).

 This is my class, is my hashCode deterministic?

 public final class MyEventCountKey {
 private final String countDateTime;private final String 
 domain;private final String event;public MyEventCountKey(final 
 String countDateTime, final String domain, final String event) {
 this.countDateTime = countDateTime;this.domain = 
 domain;this.event = event;}

 pu

Re: How to proper hashCode() for keys.

2022-02-07 Thread Chesnay Schepler
Your Key selector doesn't need to implement hashCode, but given the same 
object it has to return the same key.
In your reproducer the returned key will have different timestamps, and 
since the timestamp is included in the hashCode, they will be different 
each time.


On 07/02/2022 14:50, John Smith wrote:
I don't get it? I provided the reproducer. I implemented the interface 
to Key selector it needs hashcode and equals as well?


I'm attempting to do click stream. So the key is based on processing 
date/time rounded to the minute + domain name + path


So these should be valid below?

2022-01-01T10:02:00 + cnn.com  + /article1
2022-01-01T10:02:00 + cnn.com  + /article1
2022-01-01T10:02:00 + cnn.com  + /article1

2022-01-01T10:02:00 + cnn.com  + /article2

2022-01-01T10:03:00 + cnn.com  + /article1
2022-01-01T10:03:00 + cnn.com  + /article1

2022-01-01T10:03:00 + cnn.com  + /article3
2022-01-01T10:03:00 + cnn.com  + /article3

On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, 
 wrote:


Don't KeySelectors also need to be deterministic?

* The {@link KeySelector} allows to use deterministic objects for
operations such as reduce, * reduceGroup, join, coGroup, etc. *If
invoked multiple times on the same object, the returned key*
must be the same.*


On 04/02/2022 18:25, John Smith wrote:

Hi Francesco,  here is the reproducer:
https://github.com/javadevmtl/flink-key-reproducer

So, essentially it looks like when there's a high influx of
records produced from the source that the Exception is thrown.

The key is generated by 3 values: date/time rounded to the minute
and 2 strings.
So you will see keys as follows...
2022-02-04T17:20:00Z|foo|bar
2022-02-04T17:21:00Z|foo|bar
2022-02-04T17:22:00Z|foo|bar

The reproducer has a custom source that basically produces a
record in a loop and sleeps for a specified period of
milliseconds 100ms in this case.
The lower the sleep delay the faster records are produced the
more chances the exception is thrown. With a 100ms delay it's
always thrown. Setting a 2000 to 3000ms will guarantee it to work.
The original job uses a Kafka Source so it should technically be
able to handle even a couple thousand records per second.


On Thu, 3 Feb 2022 at 16:41, John Smith 
wrote:

Ok it's not my data either. I think it may be a volume issue.
I have managed to consistently reproduce the error. I'll
upload a reproducer ASAP.



On Thu, 3 Feb 2022 at 15:37, John Smith
 wrote:

Ok so I tried to create a reproducer but I couldn't
reproduce it. But the actual job once in a while throws
that error. So I'm wondering if maybe one of the records
that comes in is not valid, though I do validate prior to
getting to the key and window operators.

On Thu, 3 Feb 2022 at 14:32, John Smith
 wrote:

Actually maybe not because with PrintSinkFunction it
ran for a bit and then it threw the error.

On Thu, 3 Feb 2022 at 14:24, John Smith
 wrote:

Ok it may be the ElasticSearch connector causing
the issue?

If I use PrintSinkFunction then I get no error
and my stats print as expected.

On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani
 wrote:

Hi,
your hash code and equals seems correct. Can
you post a minimum stream pipeline reproducer
using this class?

FG

On Tue, Feb 1, 2022 at 8:39 PM John Smith
 wrote:

Hi, getting
java.lang.IllegalArgumentException: Key
group 39 is not in
KeyGroupRange{startKeyGroup=96,
endKeyGroup=103}. Unless you're directly
using low level state access APIs, this
is most likely caused by
non-deterministic shuffle key (hashCode
and equals implementation).

This is my class, is my hashCode
deterministic?

public final class MyEventCountKey {
 private final StringcountDateTime; private 
final Stringdomain; private final Stringevent; public MyEventCountKey(final 
String countDateTime, final String domain, final String event) {
 this.countDateTime = countDateTime; 
this.domain = domain; this

Re: How to proper hashCode() for keys.

2022-02-07 Thread John Smith
I don't get it? I provided the reproducer. I implemented the interface to
Key selector it needs hashcode and equals as well?

I'm attempting to do click stream. So the key is based on processing
date/time rounded to the minute + domain name + path

So these should be valid below?

2022-01-01T10:02:00 + cnn.com + /article1
2022-01-01T10:02:00 + cnn.com + /article1
2022-01-01T10:02:00 + cnn.com + /article1

2022-01-01T10:02:00 + cnn.com + /article2

2022-01-01T10:03:00 + cnn.com + /article1
2022-01-01T10:03:00 + cnn.com + /article1

2022-01-01T10:03:00 + cnn.com + /article3
2022-01-01T10:03:00 + cnn.com + /article3

On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, 
wrote:

> Don't KeySelectors also need to be deterministic?
>
> * The {@link KeySelector} allows to use deterministic objects for operations 
> such as reduce,* reduceGroup, join, coGroup, etc. *If invoked multiple times 
> on the same object, the returned key*** must be the same.*
>
>
> On 04/02/2022 18:25, John Smith wrote:
>
> Hi Francesco,  here is the reproducer:
> https://github.com/javadevmtl/flink-key-reproducer
>
> So, essentially it looks like when there's a high influx of records
> produced from the source that the Exception is thrown.
>
> The key is generated by 3 values: date/time rounded to the minute and 2
> strings.
> So you will see keys as follows...
> 2022-02-04T17:20:00Z|foo|bar
> 2022-02-04T17:21:00Z|foo|bar
> 2022-02-04T17:22:00Z|foo|bar
>
> The reproducer has a custom source that basically produces a record in a
> loop and sleeps for a specified period of milliseconds 100ms in this case.
> The lower the sleep delay the faster records are produced the more chances
> the exception is thrown. With a 100ms delay it's always thrown. Setting a
> 2000 to 3000ms will guarantee it to work.
> The original job uses a Kafka Source so it should technically be able to
> handle even a couple thousand records per second.
>
>
> On Thu, 3 Feb 2022 at 16:41, John Smith  wrote:
>
>> Ok it's not my data either. I think it may be a volume issue. I have
>> managed to consistently reproduce the error. I'll upload a reproducer ASAP.
>>
>>
>>
>> On Thu, 3 Feb 2022 at 15:37, John Smith  wrote:
>>
>>> Ok so I tried to create a reproducer but I couldn't reproduce it. But
>>> the actual job once in a while throws that error. So I'm wondering if maybe
>>> one of the records that comes in is not valid, though I do validate prior
>>> to getting to the key and window operators.
>>>
>>> On Thu, 3 Feb 2022 at 14:32, John Smith  wrote:
>>>
 Actually maybe not because with PrintSinkFunction it ran for a bit and
 then it threw the error.

 On Thu, 3 Feb 2022 at 14:24, John Smith  wrote:

> Ok it may be the ElasticSearch connector causing the issue?
>
> If I use PrintSinkFunction then I get no error and my stats print as
> expected.
>
> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani <
> france...@ververica.com> wrote:
>
>> Hi,
>> your hash code and equals seems correct. Can you post a minimum
>> stream pipeline reproducer using this class?
>>
>> FG
>>
>> On Tue, Feb 1, 2022 at 8:39 PM John Smith 
>> wrote:
>>
>>> Hi, getting java.lang.IllegalArgumentException: Key group 39 is not
>>> in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're 
>>> directly
>>> using low level state access APIs, this is most likely caused by
>>> non-deterministic shuffle key (hashCode and equals implementation).
>>>
>>> This is my class, is my hashCode deterministic?
>>>
>>> public final class MyEventCountKey {
>>> private final String countDateTime;private final String domain; 
>>>private final String event;public MyEventCountKey(final String 
>>> countDateTime, final String domain, final String event) {
>>> this.countDateTime = countDateTime;this.domain = 
>>> domain;this.event = event;}
>>>
>>> public String getCountDateTime() {
>>> return countDateTime;}
>>>
>>> public String getDomain() {
>>> return domain;}
>>>
>>> public String getEven() {
>>> return event;}
>>>
>>> @Overridepublic String toString() {
>>> return countDateTime + "|" + domain + "|" + event;}
>>>
>>> @Overridepublic boolean equals(Object o) {
>>> if (this == o) return true;if (o == null || getClass() 
>>> != o.getClass()) return false;MyEventCountKey that = 
>>> (MyEventCountKey) o;return 
>>> countDateTime.equals(that.countDateTime) &&
>>> domain.equals(that.domain) &&
>>> event.equals(that.event);}
>>>
>>> @Overridepublic int hashCode() {
>>> final int prime = 31;int result = 1;result = 
>>> prime * result + countDateTime.hashCode();result

Re: How to proper hashCode() for keys.

2022-02-06 Thread Chesnay Schepler

Don't KeySelectors also need to be deterministic?

* The {@link KeySelector} allows to use deterministic objects for 
operations such as reduce, * reduceGroup, join, coGroup, etc. *If 
invoked multiple times on the same object, the returned key* must be 
the same.*



On 04/02/2022 18:25, John Smith wrote:
Hi Francesco,  here is the reproducer: 
https://github.com/javadevmtl/flink-key-reproducer


So, essentially it looks like when there's a high influx of records 
produced from the source that the Exception is thrown.


The key is generated by 3 values: date/time rounded to the minute and 
2 strings.

So you will see keys as follows...
2022-02-04T17:20:00Z|foo|bar
2022-02-04T17:21:00Z|foo|bar
2022-02-04T17:22:00Z|foo|bar

The reproducer has a custom source that basically produces a record in 
a loop and sleeps for a specified period of milliseconds 100ms in this 
case.
The lower the sleep delay the faster records are produced the more 
chances the exception is thrown. With a 100ms delay it's always 
thrown. Setting a 2000 to 3000ms will guarantee it to work.
The original job uses a Kafka Source so it should technically be able 
to handle even a couple thousand records per second.



On Thu, 3 Feb 2022 at 16:41, John Smith  wrote:

Ok it's not my data either. I think it may be a volume issue. I
have managed to consistently reproduce the error. I'll upload a
reproducer ASAP.



On Thu, 3 Feb 2022 at 15:37, John Smith 
wrote:

Ok so I tried to create a reproducer but I couldn't reproduce
it. But the actual job once in a while throws that error. So
I'm wondering if maybe one of the records that comes in is not
valid, though I do validate prior to getting to the key and
window operators.

On Thu, 3 Feb 2022 at 14:32, John Smith
 wrote:

Actually maybe not because with PrintSinkFunction it ran
for a bit and then it threw the error.

On Thu, 3 Feb 2022 at 14:24, John Smith
 wrote:

Ok it may be the ElasticSearch connector causing the
issue?

If I use PrintSinkFunction then I get no error and my
stats print as expected.

On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani
 wrote:

Hi,
your hash code and equals seems correct. Can you
post a minimum stream pipeline reproducer using
this class?

FG

On Tue, Feb 1, 2022 at 8:39 PM John Smith
 wrote:

Hi, getting
java.lang.IllegalArgumentException: Key group
39 is not in KeyGroupRange{startKeyGroup=96,
endKeyGroup=103}. Unless you're directly using
low level state access APIs, this is most
likely caused by non-deterministic shuffle key
(hashCode and equals implementation).

This is my class, is my hashCode deterministic?

public final class MyEventCountKey {
 private final StringcountDateTime; private final 
Stringdomain; private final Stringevent; public MyEventCountKey(final String 
countDateTime, final String domain, final String event) {
 this.countDateTime = countDateTime; 
this.domain = domain; this.event = event; }

 public StringgetCountDateTime() {
 return countDateTime; }

 public StringgetDomain() {
 return domain; }

 public StringgetEven() {
 return event; }

 @Override public StringtoString() {
 return countDateTime +"|" +domain +"|" +event; 
}

 @Override public boolean equals(Object o) {
 if (this == o)return true; if (o ==null || getClass() 
!= o.getClass())return false; MyEventCountKey that = (MyEventCountKey) o; return 
countDateTime.equals(that.countDateTime) &&
 domain.equals(that.domain) &&
 event.equals(that.event); }

 @Override public int hashCode() {
 final int prime =31; int result =1; result = 
prime * result +countDateTime.hashCode(); result = prime * result 
+domain.hashCode(); result = prime * result +event.hashCode(); return result; }
}



Re: How to proper hashCode() for keys.

2022-02-04 Thread John Smith
Hi Francesco,  here is the reproducer:
https://github.com/javadevmtl/flink-key-reproducer

So, essentially it looks like when there's a high influx of records
produced from the source that the Exception is thrown.

The key is generated by 3 values: date/time rounded to the minute and 2
strings.
So you will see keys as follows...
2022-02-04T17:20:00Z|foo|bar
2022-02-04T17:21:00Z|foo|bar
2022-02-04T17:22:00Z|foo|bar

The reproducer has a custom source that basically produces a record in a
loop and sleeps for a specified period of milliseconds 100ms in this case.
The lower the sleep delay the faster records are produced the more chances
the exception is thrown. With a 100ms delay it's always thrown. Setting a
2000 to 3000ms will guarantee it to work.
The original job uses a Kafka Source so it should technically be able to
handle even a couple thousand records per second.


On Thu, 3 Feb 2022 at 16:41, John Smith  wrote:

> Ok it's not my data either. I think it may be a volume issue. I have
> managed to consistently reproduce the error. I'll upload a reproducer ASAP.
>
>
>
> On Thu, 3 Feb 2022 at 15:37, John Smith  wrote:
>
>> Ok so I tried to create a reproducer but I couldn't reproduce it. But the
>> actual job once in a while throws that error. So I'm wondering if maybe one
>> of the records that comes in is not valid, though I do validate prior to
>> getting to the key and window operators.
>>
>> On Thu, 3 Feb 2022 at 14:32, John Smith  wrote:
>>
>>> Actually maybe not because with PrintSinkFunction it ran for a bit and
>>> then it threw the error.
>>>
>>> On Thu, 3 Feb 2022 at 14:24, John Smith  wrote:
>>>
 Ok it may be the ElasticSearch connector causing the issue?

 If I use PrintSinkFunction then I get no error and my stats print as
 expected.

 On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani <
 france...@ververica.com> wrote:

> Hi,
> your hash code and equals seems correct. Can you post a minimum stream
> pipeline reproducer using this class?
>
> FG
>
> On Tue, Feb 1, 2022 at 8:39 PM John Smith 
> wrote:
>
>> Hi, getting java.lang.IllegalArgumentException: Key group 39 is not
>> in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're 
>> directly
>> using low level state access APIs, this is most likely caused by
>> non-deterministic shuffle key (hashCode and equals implementation).
>>
>> This is my class, is my hashCode deterministic?
>>
>> public final class MyEventCountKey {
>> private final String countDateTime;
>> private final String domain;
>> private final String event;
>>
>> public MyEventCountKey(final String countDateTime, final String 
>> domain, final String event) {
>> this.countDateTime = countDateTime;
>> this.domain = domain;
>> this.event = event;
>> }
>>
>> public String getCountDateTime() {
>> return countDateTime;
>> }
>>
>> public String getDomain() {
>> return domain;
>> }
>>
>> public String getEven() {
>> return event;
>> }
>>
>> @Override
>> public String toString() {
>> return countDateTime + "|" + domain + "|" + event;
>> }
>>
>> @Override
>> public boolean equals(Object o) {
>> if (this == o) return true;
>> if (o == null || getClass() != o.getClass()) return false;
>> MyEventCountKey that = (MyEventCountKey) o;
>> return countDateTime.equals(that.countDateTime) &&
>> domain.equals(that.domain) &&
>> event.equals(that.event);
>> }
>>
>> @Override
>> public int hashCode() {
>> final int prime = 31;
>> int result = 1;
>> result = prime * result + countDateTime.hashCode();
>> result = prime * result + domain.hashCode();
>> result = prime * result +  event.hashCode();
>> return result;
>> }
>> }
>>
>>


Re: How to proper hashCode() for keys.

2022-02-03 Thread John Smith
Ok it's not my data either. I think it may be a volume issue. I have
managed to consistently reproduce the error. I'll upload a reproducer ASAP.



On Thu, 3 Feb 2022 at 15:37, John Smith  wrote:

> Ok so I tried to create a reproducer but I couldn't reproduce it. But the
> actual job once in a while throws that error. So I'm wondering if maybe one
> of the records that comes in is not valid, though I do validate prior to
> getting to the key and window operators.
>
> On Thu, 3 Feb 2022 at 14:32, John Smith  wrote:
>
>> Actually maybe not because with PrintSinkFunction it ran for a bit and
>> then it threw the error.
>>
>> On Thu, 3 Feb 2022 at 14:24, John Smith  wrote:
>>
>>> Ok it may be the ElasticSearch connector causing the issue?
>>>
>>> If I use PrintSinkFunction then I get no error and my stats print as
>>> expected.
>>>
>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani <
>>> france...@ververica.com> wrote:
>>>
 Hi,
 your hash code and equals seems correct. Can you post a minimum stream
 pipeline reproducer using this class?

 FG

 On Tue, Feb 1, 2022 at 8:39 PM John Smith 
 wrote:

> Hi, getting java.lang.IllegalArgumentException: Key group 39 is not in
> KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're directly
> using low level state access APIs, this is most likely caused by
> non-deterministic shuffle key (hashCode and equals implementation).
>
> This is my class, is my hashCode deterministic?
>
> public final class MyEventCountKey {
> private final String countDateTime;
> private final String domain;
> private final String event;
>
> public MyEventCountKey(final String countDateTime, final String 
> domain, final String event) {
> this.countDateTime = countDateTime;
> this.domain = domain;
> this.event = event;
> }
>
> public String getCountDateTime() {
> return countDateTime;
> }
>
> public String getDomain() {
> return domain;
> }
>
> public String getEven() {
> return event;
> }
>
> @Override
> public String toString() {
> return countDateTime + "|" + domain + "|" + event;
> }
>
> @Override
> public boolean equals(Object o) {
> if (this == o) return true;
> if (o == null || getClass() != o.getClass()) return false;
> MyEventCountKey that = (MyEventCountKey) o;
> return countDateTime.equals(that.countDateTime) &&
> domain.equals(that.domain) &&
> event.equals(that.event);
> }
>
> @Override
> public int hashCode() {
> final int prime = 31;
> int result = 1;
> result = prime * result + countDateTime.hashCode();
> result = prime * result + domain.hashCode();
> result = prime * result +  event.hashCode();
> return result;
> }
> }
>
>


Re: How to proper hashCode() for keys.

2022-02-03 Thread John Smith
Ok so I tried to create a reproducer but I couldn't reproduce it. But the
actual job once in a while throws that error. So I'm wondering if maybe one
of the records that comes in is not valid, though I do validate prior to
getting to the key and window operators.

On Thu, 3 Feb 2022 at 14:32, John Smith  wrote:

> Actually maybe not because with PrintSinkFunction it ran for a bit and
> then it threw the error.
>
> On Thu, 3 Feb 2022 at 14:24, John Smith  wrote:
>
>> Ok it may be the ElasticSearch connector causing the issue?
>>
>> If I use PrintSinkFunction then I get no error and my stats print as
>> expected.
>>
>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani 
>> wrote:
>>
>>> Hi,
>>> your hash code and equals seems correct. Can you post a minimum stream
>>> pipeline reproducer using this class?
>>>
>>> FG
>>>
>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith 
>>> wrote:
>>>
 Hi, getting java.lang.IllegalArgumentException: Key group 39 is not in
 KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're directly
 using low level state access APIs, this is most likely caused by
 non-deterministic shuffle key (hashCode and equals implementation).

 This is my class, is my hashCode deterministic?

 public final class MyEventCountKey {
 private final String countDateTime;
 private final String domain;
 private final String event;

 public MyEventCountKey(final String countDateTime, final String 
 domain, final String event) {
 this.countDateTime = countDateTime;
 this.domain = domain;
 this.event = event;
 }

 public String getCountDateTime() {
 return countDateTime;
 }

 public String getDomain() {
 return domain;
 }

 public String getEven() {
 return event;
 }

 @Override
 public String toString() {
 return countDateTime + "|" + domain + "|" + event;
 }

 @Override
 public boolean equals(Object o) {
 if (this == o) return true;
 if (o == null || getClass() != o.getClass()) return false;
 MyEventCountKey that = (MyEventCountKey) o;
 return countDateTime.equals(that.countDateTime) &&
 domain.equals(that.domain) &&
 event.equals(that.event);
 }

 @Override
 public int hashCode() {
 final int prime = 31;
 int result = 1;
 result = prime * result + countDateTime.hashCode();
 result = prime * result + domain.hashCode();
 result = prime * result +  event.hashCode();
 return result;
 }
 }




Re: How to proper hashCode() for keys.

2022-02-03 Thread John Smith
Actually maybe not because with PrintSinkFunction it ran for a bit and then
it threw the error.

On Thu, 3 Feb 2022 at 14:24, John Smith  wrote:

> Ok it may be the ElasticSearch connector causing the issue?
>
> If I use PrintSinkFunction then I get no error and my stats print as
> expected.
>
> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani 
> wrote:
>
>> Hi,
>> your hash code and equals seems correct. Can you post a minimum stream
>> pipeline reproducer using this class?
>>
>> FG
>>
>> On Tue, Feb 1, 2022 at 8:39 PM John Smith  wrote:
>>
>>> Hi, getting java.lang.IllegalArgumentException: Key group 39 is not in
>>> KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're directly
>>> using low level state access APIs, this is most likely caused by
>>> non-deterministic shuffle key (hashCode and equals implementation).
>>>
>>> This is my class, is my hashCode deterministic?
>>>
>>> public final class MyEventCountKey {
>>> private final String countDateTime;
>>> private final String domain;
>>> private final String event;
>>>
>>> public MyEventCountKey(final String countDateTime, final String domain, 
>>> final String event) {
>>> this.countDateTime = countDateTime;
>>> this.domain = domain;
>>> this.event = event;
>>> }
>>>
>>> public String getCountDateTime() {
>>> return countDateTime;
>>> }
>>>
>>> public String getDomain() {
>>> return domain;
>>> }
>>>
>>> public String getEven() {
>>> return event;
>>> }
>>>
>>> @Override
>>> public String toString() {
>>> return countDateTime + "|" + domain + "|" + event;
>>> }
>>>
>>> @Override
>>> public boolean equals(Object o) {
>>> if (this == o) return true;
>>> if (o == null || getClass() != o.getClass()) return false;
>>> MyEventCountKey that = (MyEventCountKey) o;
>>> return countDateTime.equals(that.countDateTime) &&
>>> domain.equals(that.domain) &&
>>> event.equals(that.event);
>>> }
>>>
>>> @Override
>>> public int hashCode() {
>>> final int prime = 31;
>>> int result = 1;
>>> result = prime * result + countDateTime.hashCode();
>>> result = prime * result + domain.hashCode();
>>> result = prime * result +  event.hashCode();
>>> return result;
>>> }
>>> }
>>>
>>>


Re: How to proper hashCode() for keys.

2022-02-03 Thread John Smith
Ok it may be the ElasticSearch connector causing the issue?

If I use PrintSinkFunction then I get no error and my stats print as
expected.

On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani 
wrote:

> Hi,
> your hash code and equals seems correct. Can you post a minimum stream
> pipeline reproducer using this class?
>
> FG
>
> On Tue, Feb 1, 2022 at 8:39 PM John Smith  wrote:
>
>> Hi, getting java.lang.IllegalArgumentException: Key group 39 is not in
>> KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're directly
>> using low level state access APIs, this is most likely caused by
>> non-deterministic shuffle key (hashCode and equals implementation).
>>
>> This is my class, is my hashCode deterministic?
>>
>> public final class MyEventCountKey {
>> private final String countDateTime;
>> private final String domain;
>> private final String event;
>>
>> public MyEventCountKey(final String countDateTime, final String domain, 
>> final String event) {
>> this.countDateTime = countDateTime;
>> this.domain = domain;
>> this.event = event;
>> }
>>
>> public String getCountDateTime() {
>> return countDateTime;
>> }
>>
>> public String getDomain() {
>> return domain;
>> }
>>
>> public String getEven() {
>> return event;
>> }
>>
>> @Override
>> public String toString() {
>> return countDateTime + "|" + domain + "|" + event;
>> }
>>
>> @Override
>> public boolean equals(Object o) {
>> if (this == o) return true;
>> if (o == null || getClass() != o.getClass()) return false;
>> MyEventCountKey that = (MyEventCountKey) o;
>> return countDateTime.equals(that.countDateTime) &&
>> domain.equals(that.domain) &&
>> event.equals(that.event);
>> }
>>
>> @Override
>> public int hashCode() {
>> final int prime = 31;
>> int result = 1;
>> result = prime * result + countDateTime.hashCode();
>> result = prime * result + domain.hashCode();
>> result = prime * result +  event.hashCode();
>> return result;
>> }
>> }
>>
>>


Re: How to proper hashCode() for keys.

2022-02-02 Thread Francesco Guardiani
Hi,
your hash code and equals seems correct. Can you post a minimum stream
pipeline reproducer using this class?

FG

On Tue, Feb 1, 2022 at 8:39 PM John Smith  wrote:

> Hi, getting java.lang.IllegalArgumentException: Key group 39 is not in
> KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're directly
> using low level state access APIs, this is most likely caused by
> non-deterministic shuffle key (hashCode and equals implementation).
>
> This is my class, is my hashCode deterministic?
>
> public final class MyEventCountKey {
> private final String countDateTime;
> private final String domain;
> private final String event;
>
> public MyEventCountKey(final String countDateTime, final String domain, 
> final String event) {
> this.countDateTime = countDateTime;
> this.domain = domain;
> this.event = event;
> }
>
> public String getCountDateTime() {
> return countDateTime;
> }
>
> public String getDomain() {
> return domain;
> }
>
> public String getEven() {
> return event;
> }
>
> @Override
> public String toString() {
> return countDateTime + "|" + domain + "|" + event;
> }
>
> @Override
> public boolean equals(Object o) {
> if (this == o) return true;
> if (o == null || getClass() != o.getClass()) return false;
> MyEventCountKey that = (MyEventCountKey) o;
> return countDateTime.equals(that.countDateTime) &&
> domain.equals(that.domain) &&
> event.equals(that.event);
> }
>
> @Override
> public int hashCode() {
> final int prime = 31;
> int result = 1;
> result = prime * result + countDateTime.hashCode();
> result = prime * result + domain.hashCode();
> result = prime * result +  event.hashCode();
> return result;
> }
> }
>
>


How to proper hashCode() for keys.

2022-02-01 Thread John Smith
Hi, getting java.lang.IllegalArgumentException: Key group 39 is not in
KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're directly
using low level state access APIs, this is most likely caused by
non-deterministic shuffle key (hashCode and equals implementation).

This is my class, is my hashCode deterministic?

public final class MyEventCountKey {
private final String countDateTime;
private final String domain;
private final String event;

public MyEventCountKey(final String countDateTime, final String
domain, final String event) {
this.countDateTime = countDateTime;
this.domain = domain;
this.event = event;
}

public String getCountDateTime() {
return countDateTime;
}

public String getDomain() {
return domain;
}

public String getEven() {
return event;
}

@Override
public String toString() {
return countDateTime + "|" + domain + "|" + event;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MyEventCountKey that = (MyEventCountKey) o;
return countDateTime.equals(that.countDateTime) &&
domain.equals(that.domain) &&
event.equals(that.event);
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + countDateTime.hashCode();
result = prime * result + domain.hashCode();
result = prime * result +  event.hashCode();
return result;
}
}