Hello John,

During the lifecycle of the execution for a given event, the key information
is not passed in between different operators, but they are computed based on
the given key selector, every time an (keyed)operator sees the event.
Therefore, the same event, within the same pipeline, could be assigned a
different key while moving along the graph of operators. This part in your
key
selector is not deterministic since it depends on the time the key selector
function is executed. My suggestion would be to materialise the key as an
additional field to your event at the beginning of the pipeline and then use
that field as the key.

Sincerely,

Ali

On Mon, Feb 7, 2022 at 7:06 PM John Smith <java.dev....@gmail.com> wrote:

> Maybe there's a misunderstanding. But basically I want to do clickstream
> count for a given "url" and for simplicity and accuracy of the count base
> it on processing time (event time doesn't matter as long as I get a total
> of clicks at that given processing time)
>
> So regardless of the event time. I want all clicks for the current
> processing time rounded to the minute per link.
>
> So, if now was 2022-04-07T12:01:00.000Z
>
> Then I would want the following result...
>
> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10
> 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2
> 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15
> ....
> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30
> 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1
> 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10
> And so on...
>
> @Override
> public MyEventCountKey getKey(final MyEvent click) throws Exception
> {
> MyEventCountKey key = new MyEventCountKey(
> Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")),
> ChronoField.MINUTE_OF_HOUR, windowSizeMins)).toString(),
> click.getDomain(), // cnn.com
> click.getPath(), // /some-article-name
> );
> return key;
> }
>
>
>
> On Mon, Feb 7, 2022 at 10:48 AM David Morávek <d...@apache.org> wrote:
>
>> The key selector works.
>>
>>
>> No it does not ;) It depends on the system time so it's not deterministic
>> (you can get different keys for the very same element).
>>
>> How do you key a count based on the time. I have taken this from samples
>>> online.
>>>
>>
>> This is what the windowing is for. You basically want to group / combine
>> elements per key and event time window [1].
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/
>>
>> Best,
>> D.
>>
>> On Mon, Feb 7, 2022 at 3:44 PM John Smith <java.dev....@gmail.com> wrote:
>>
>>> The key selector works. It only causes an issue if there too many keys
>>> produced in one shot. For example of 100 "same" keys are produced for that
>>> 1 minutes it's ok. But if 101 are produced the error happens.
>>>
>>>
>>> If you look at the reproducer at least that's what's hapenning
>>>
>>> How do you key a count based on the time. I have taken this from samples
>>> online.
>>>
>>> The key is that particular time for that particular URL path.
>>>
>>> So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00
>>>
>>> On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, <ches...@apache.org>
>>> wrote:
>>>
>>>> Your Key selector doesn't need to implement hashCode, but given the
>>>> same object it has to return the same key.
>>>> In your reproducer the returned key will have different timestamps, and
>>>> since the timestamp is included in the hashCode, they will be different
>>>> each time.
>>>>
>>>> On 07/02/2022 14:50, John Smith wrote:
>>>>
>>>> I don't get it? I provided the reproducer. I implemented the interface
>>>> to Key selector it needs hashcode and equals as well?
>>>>
>>>> I'm attempting to do click stream. So the key is based on processing
>>>> date/time rounded to the minute + domain name + path
>>>>
>>>> So these should be valid below?
>>>>
>>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>>>
>>>> 2022-01-01T10:02:00 + cnn.com + /article2
>>>>
>>>> 2022-01-01T10:03:00 + cnn.com + /article1
>>>> 2022-01-01T10:03:00 + cnn.com + /article1
>>>>
>>>> 2022-01-01T10:03:00 + cnn.com + /article3
>>>> 2022-01-01T10:03:00 + cnn.com + /article3
>>>>
>>>> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, <ches...@apache.org>
>>>> wrote:
>>>>
>>>>> Don't KeySelectors also need to be deterministic?
>>>>>
>>>>> * The {@link KeySelector} allows to use deterministic objects for 
>>>>> operations such as reduce,* reduceGroup, join, coGroup, etc. *If invoked 
>>>>> multiple times on the same object, the returned key*** must be the same.*
>>>>>
>>>>>
>>>>> On 04/02/2022 18:25, John Smith wrote:
>>>>>
>>>>> Hi Francesco,  here is the reproducer:
>>>>> https://github.com/javadevmtl/flink-key-reproducer
>>>>>
>>>>> So, essentially it looks like when there's a high influx of records
>>>>> produced from the source that the Exception is thrown.
>>>>>
>>>>> The key is generated by 3 values: date/time rounded to the minute and
>>>>> 2 strings.
>>>>> So you will see keys as follows...
>>>>> 2022-02-04T17:20:00Z|foo|bar
>>>>> 2022-02-04T17:21:00Z|foo|bar
>>>>> 2022-02-04T17:22:00Z|foo|bar
>>>>>
>>>>> The reproducer has a custom source that basically produces a record in
>>>>> a loop and sleeps for a specified period of milliseconds 100ms in this 
>>>>> case.
>>>>> The lower the sleep delay the faster records are produced the more
>>>>> chances the exception is thrown. With a 100ms delay it's always thrown.
>>>>> Setting a 2000 to 3000ms will guarantee it to work.
>>>>> The original job uses a Kafka Source so it should technically be able
>>>>> to handle even a couple thousand records per second.
>>>>>
>>>>>
>>>>> On Thu, 3 Feb 2022 at 16:41, John Smith <java.dev....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Ok it's not my data either. I think it may be a volume issue. I have
>>>>>> managed to consistently reproduce the error. I'll upload a reproducer 
>>>>>> ASAP.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, 3 Feb 2022 at 15:37, John Smith <java.dev....@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Ok so I tried to create a reproducer but I couldn't reproduce it.
>>>>>>> But the actual job once in a while throws that error. So I'm wondering 
>>>>>>> if
>>>>>>> maybe one of the records that comes in is not valid, though I do 
>>>>>>> validate
>>>>>>> prior to getting to the key and window operators.
>>>>>>>
>>>>>>> On Thu, 3 Feb 2022 at 14:32, John Smith <java.dev....@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Actually maybe not because with PrintSinkFunction it ran for a bit
>>>>>>>> and then it threw the error.
>>>>>>>>
>>>>>>>> On Thu, 3 Feb 2022 at 14:24, John Smith <java.dev....@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Ok it may be the ElasticSearch connector causing the issue?
>>>>>>>>>
>>>>>>>>> If I use PrintSinkFunction then I get no error and my stats print
>>>>>>>>> as expected.
>>>>>>>>>
>>>>>>>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani <
>>>>>>>>> france...@ververica.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>> your hash code and equals seems correct. Can you post a minimum
>>>>>>>>>> stream pipeline reproducer using this class?
>>>>>>>>>>
>>>>>>>>>> FG
>>>>>>>>>>
>>>>>>>>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith <java.dev....@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi, getting java.lang.IllegalArgumentException: Key group 39 is
>>>>>>>>>>> not in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless 
>>>>>>>>>>> you're
>>>>>>>>>>> directly using low level state access APIs, this is most likely 
>>>>>>>>>>> caused by
>>>>>>>>>>> non-deterministic shuffle key (hashCode and equals implementation).
>>>>>>>>>>>
>>>>>>>>>>> This is my class, is my hashCode deterministic?
>>>>>>>>>>>
>>>>>>>>>>> public final class MyEventCountKey {
>>>>>>>>>>>     private final String countDateTime;    private final String 
>>>>>>>>>>> domain;    private final String event;    public 
>>>>>>>>>>> MyEventCountKey(final String countDateTime, final String domain, 
>>>>>>>>>>> final String event) {
>>>>>>>>>>>         this.countDateTime = countDateTime;        this.domain = 
>>>>>>>>>>> domain;        this.event = event;    }
>>>>>>>>>>>
>>>>>>>>>>>     public String getCountDateTime() {
>>>>>>>>>>>         return countDateTime;    }
>>>>>>>>>>>
>>>>>>>>>>>     public String getDomain() {
>>>>>>>>>>>         return domain;    }
>>>>>>>>>>>
>>>>>>>>>>>     public String getEven() {
>>>>>>>>>>>         return event;    }
>>>>>>>>>>>
>>>>>>>>>>>     @Override    public String toString() {
>>>>>>>>>>>         return countDateTime + "|" + domain + "|" + event;    }
>>>>>>>>>>>
>>>>>>>>>>>     @Override    public boolean equals(Object o) {
>>>>>>>>>>>         if (this == o) return true;        if (o == null || 
>>>>>>>>>>> getClass() != o.getClass()) return false;        MyEventCountKey 
>>>>>>>>>>> that = (MyEventCountKey) o;        return 
>>>>>>>>>>> countDateTime.equals(that.countDateTime) &&
>>>>>>>>>>>                 domain.equals(that.domain) &&
>>>>>>>>>>>                 event.equals(that.event);    }
>>>>>>>>>>>
>>>>>>>>>>>     @Override    public int hashCode() {
>>>>>>>>>>>         final int prime = 31;        int result = 1;        result 
>>>>>>>>>>> = prime * result + countDateTime.hashCode();        result = prime 
>>>>>>>>>>> * result + domain.hashCode();        result = prime * result +  
>>>>>>>>>>> event.hashCode();        return result;    }
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>
>>>>

Reply via email to