Hello John,

Currently you are grouping the elements two times based on some time
attribute, one while keying - with event time - and one while windowing -
with
processing time. Therefore, the windowing mechanism produces a new window
computation when you see an element with the same key but arrived later from
the previous window start and end timestamps. Can you please clarify with
which notion of time you would like to handle the stream of data?

Sincerely,

Ali

On Fri, Feb 11, 2022 at 6:43 PM John Smith <java.dev....@gmail.com> wrote:

> Ok I used the method suggested by Ali. The error is gone. But now I see
> multiple counts emitted for the same key...
>
> DataStream<MyEvent> slStream = env.fromSource(kafkaSource, 
> WatermarkStrategy.noWatermarks(), "Kafka Source")
>         .uid(kafkaTopic).name(kafkaTopic)
>         .setParallelism(kafkaParallelism)
>         .flatMap(new MapToMyEvent("my-event", windowSizeMins, "message")) 
> <------ Timestamp in GMT created here rounded to the closest minute down.
>         .uid("map-json-logs").name("map-json-logs");
>
>         slStream.keyBy(new MinutesKeySelector())
>         
> .window(TumblingProcessingTimeWindows.of(Time.minutes(windowSizeMins))) <---- 
> Tumbling window of 1 minute.
>
>
>
> So below you will see a new count was emitted at 16:51 and 16:55
>
> {"countId":"2022-02-11T16:50:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:50:00Z","domain":"
> mydomain.com","uri":"/some-article","count":3542}
> -----
> {"countId":"2022-02-11T16:51:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
> mydomain.com","uri":"/some-article","count":16503}
> {"countId":"2022-02-11T16:51:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
> mydomain.com","uri":"/some-article","count":70}
> -----
>
> {"countId":"2022-02-11T16:52:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:52:00Z","domain":"
> mydomain.com","uri":"/some-article","count":16037}
> {"countId":"2022-02-11T16:53:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:53:00Z","domain":"
> mydomain.com","uri":"/some-article","count":18679}
> {"countId":"2022-02-11T16:54:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:54:00Z","domain":"
> mydomain.com","uri":"/some-article","count":17697}
> -----
>
> {"countId":"2022-02-11T16:55:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
> mydomain.com","uri":"/some-article","count":18066}
> {"countId":"2022-02-11T16:55:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
> mydomain.com","uri":"/some-article","count":58}
> -----
> {"countId":"2022-02-11T16:56:00Z|mydomain.com
> |/some-article","countDateTime":"2022-02-11T16:56:00Z","domain":"
> mydomain.com","uri":"/some-article","count":17489}
>
>
>
>
> On Mon, Feb 7, 2022 at 12:44 PM John Smith <java.dev....@gmail.com> wrote:
>
>> Ok I think Ali's solution makes the most sense to me. I'll try it and let
>> you know.
>>
>> On Mon, Feb 7, 2022 at 11:44 AM Jing Ge <j...@ververica.com> wrote:
>>
>>> Hi John,
>>>
>>> your getKey() implementation shows that it is not deterministic, since
>>> calling it with the same click instance multiple times will return
>>> different keys. For example a call at 12:01:59.950 and a call at
>>> 12:02:00.050 with the same click instance will return two different keys:
>>>
>>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name
>>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name
>>>
>>> best regards
>>> Jing
>>>
>>> On Mon, Feb 7, 2022 at 5:07 PM John Smith <java.dev....@gmail.com>
>>> wrote:
>>>
>>>> Maybe there's a misunderstanding. But basically I want to
>>>> do clickstream count for a given "url" and for simplicity and accuracy of
>>>> the count base it on processing time (event time doesn't matter as long as
>>>> I get a total of clicks at that given processing time)
>>>>
>>>> So regardless of the event time. I want all clicks for the current
>>>> processing time rounded to the minute per link.
>>>>
>>>> So, if now was 2022-04-07T12:01:00.000Z
>>>>
>>>> Then I would want the following result...
>>>>
>>>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10
>>>> 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2
>>>> 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15
>>>> ....
>>>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30
>>>> 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1
>>>> 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10
>>>> And so on...
>>>>
>>>> @Override
>>>> public MyEventCountKey getKey(final MyEvent click) throws Exception
>>>> {
>>>> MyEventCountKey key = new MyEventCountKey(
>>>> Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")),
>>>> ChronoField.MINUTE_OF_HOUR, windowSizeMins)).toString(),
>>>> click.getDomain(), // cnn.com
>>>> click.getPath(), // /some-article-name
>>>> );
>>>> return key;
>>>> }
>>>>
>>>>
>>>>
>>>> On Mon, Feb 7, 2022 at 10:48 AM David Morávek <d...@apache.org> wrote:
>>>>
>>>>> The key selector works.
>>>>>
>>>>>
>>>>> No it does not ;) It depends on the system time so it's not
>>>>> deterministic (you can get different keys for the very same element).
>>>>>
>>>>> How do you key a count based on the time. I have taken this from
>>>>>> samples online.
>>>>>>
>>>>>
>>>>> This is what the windowing is for. You basically want to group /
>>>>> combine elements per key and event time window [1].
>>>>>
>>>>> [1]
>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/
>>>>>
>>>>> Best,
>>>>> D.
>>>>>
>>>>> On Mon, Feb 7, 2022 at 3:44 PM John Smith <java.dev....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> The key selector works. It only causes an issue if there too many
>>>>>> keys produced in one shot. For example of 100 "same" keys are produced 
>>>>>> for
>>>>>> that 1 minutes it's ok. But if 101 are produced the error happens.
>>>>>>
>>>>>>
>>>>>> If you look at the reproducer at least that's what's hapenning
>>>>>>
>>>>>> How do you key a count based on the time. I have taken this from
>>>>>> samples online.
>>>>>>
>>>>>> The key is that particular time for that particular URL path.
>>>>>>
>>>>>> So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00
>>>>>>
>>>>>> On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, <
>>>>>> ches...@apache.org> wrote:
>>>>>>
>>>>>>> Your Key selector doesn't need to implement hashCode, but given the
>>>>>>> same object it has to return the same key.
>>>>>>> In your reproducer the returned key will have different timestamps,
>>>>>>> and since the timestamp is included in the hashCode, they will be 
>>>>>>> different
>>>>>>> each time.
>>>>>>>
>>>>>>> On 07/02/2022 14:50, John Smith wrote:
>>>>>>>
>>>>>>> I don't get it? I provided the reproducer. I implemented the
>>>>>>> interface to Key selector it needs hashcode and equals as well?
>>>>>>>
>>>>>>> I'm attempting to do click stream. So the key is based on processing
>>>>>>> date/time rounded to the minute + domain name + path
>>>>>>>
>>>>>>> So these should be valid below?
>>>>>>>
>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>>>>>>
>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article2
>>>>>>>
>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article1
>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article1
>>>>>>>
>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article3
>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article3
>>>>>>>
>>>>>>> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, <
>>>>>>> ches...@apache.org> wrote:
>>>>>>>
>>>>>>>> Don't KeySelectors also need to be deterministic?
>>>>>>>>
>>>>>>>> * The {@link KeySelector} allows to use deterministic objects for 
>>>>>>>> operations such as reduce,* reduceGroup, join, coGroup, etc. *If 
>>>>>>>> invoked multiple times on the same object, the returned key*** must be 
>>>>>>>> the same.*
>>>>>>>>
>>>>>>>>
>>>>>>>> On 04/02/2022 18:25, John Smith wrote:
>>>>>>>>
>>>>>>>> Hi Francesco,  here is the reproducer:
>>>>>>>> https://github.com/javadevmtl/flink-key-reproducer
>>>>>>>>
>>>>>>>> So, essentially it looks like when there's a high influx of records
>>>>>>>> produced from the source that the Exception is thrown.
>>>>>>>>
>>>>>>>> The key is generated by 3 values: date/time rounded to the minute
>>>>>>>> and 2 strings.
>>>>>>>> So you will see keys as follows...
>>>>>>>> 2022-02-04T17:20:00Z|foo|bar
>>>>>>>> 2022-02-04T17:21:00Z|foo|bar
>>>>>>>> 2022-02-04T17:22:00Z|foo|bar
>>>>>>>>
>>>>>>>> The reproducer has a custom source that basically produces a
>>>>>>>> record in a loop and sleeps for a specified period of milliseconds 
>>>>>>>> 100ms in
>>>>>>>> this case.
>>>>>>>> The lower the sleep delay the faster records are produced the more
>>>>>>>> chances the exception is thrown. With a 100ms delay it's always thrown.
>>>>>>>> Setting a 2000 to 3000ms will guarantee it to work.
>>>>>>>> The original job uses a Kafka Source so it should technically be
>>>>>>>> able to handle even a couple thousand records per second.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, 3 Feb 2022 at 16:41, John Smith <java.dev....@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Ok it's not my data either. I think it may be a volume issue. I
>>>>>>>>> have managed to consistently reproduce the error. I'll upload a 
>>>>>>>>> reproducer
>>>>>>>>> ASAP.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, 3 Feb 2022 at 15:37, John Smith <java.dev....@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Ok so I tried to create a reproducer but I couldn't reproduce it.
>>>>>>>>>> But the actual job once in a while throws that error. So I'm 
>>>>>>>>>> wondering if
>>>>>>>>>> maybe one of the records that comes in is not valid, though I do 
>>>>>>>>>> validate
>>>>>>>>>> prior to getting to the key and window operators.
>>>>>>>>>>
>>>>>>>>>> On Thu, 3 Feb 2022 at 14:32, John Smith <java.dev....@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Actually maybe not because with PrintSinkFunction it ran for a
>>>>>>>>>>> bit and then it threw the error.
>>>>>>>>>>>
>>>>>>>>>>> On Thu, 3 Feb 2022 at 14:24, John Smith <java.dev....@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Ok it may be the ElasticSearch connector causing the issue?
>>>>>>>>>>>>
>>>>>>>>>>>> If I use PrintSinkFunction then I get no error and my stats
>>>>>>>>>>>> print as expected.
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani <
>>>>>>>>>>>> france...@ververica.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>> your hash code and equals seems correct. Can you post a
>>>>>>>>>>>>> minimum stream pipeline reproducer using this class?
>>>>>>>>>>>>>
>>>>>>>>>>>>> FG
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith <
>>>>>>>>>>>>> java.dev....@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi, getting java.lang.IllegalArgumentException: Key group 39
>>>>>>>>>>>>>> is not in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. 
>>>>>>>>>>>>>> Unless you're
>>>>>>>>>>>>>> directly using low level state access APIs, this is most likely 
>>>>>>>>>>>>>> caused by
>>>>>>>>>>>>>> non-deterministic shuffle key (hashCode and equals 
>>>>>>>>>>>>>> implementation).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This is my class, is my hashCode deterministic?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> public final class MyEventCountKey {
>>>>>>>>>>>>>>     private final String countDateTime;    private final String 
>>>>>>>>>>>>>> domain;    private final String event;    public 
>>>>>>>>>>>>>> MyEventCountKey(final String countDateTime, final String domain, 
>>>>>>>>>>>>>> final String event) {
>>>>>>>>>>>>>>         this.countDateTime = countDateTime;        this.domain = 
>>>>>>>>>>>>>> domain;        this.event = event;    }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     public String getCountDateTime() {
>>>>>>>>>>>>>>         return countDateTime;    }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     public String getDomain() {
>>>>>>>>>>>>>>         return domain;    }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     public String getEven() {
>>>>>>>>>>>>>>         return event;    }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     @Override    public String toString() {
>>>>>>>>>>>>>>         return countDateTime + "|" + domain + "|" + event;    }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     @Override    public boolean equals(Object o) {
>>>>>>>>>>>>>>         if (this == o) return true;        if (o == null || 
>>>>>>>>>>>>>> getClass() != o.getClass()) return false;        MyEventCountKey 
>>>>>>>>>>>>>> that = (MyEventCountKey) o;        return 
>>>>>>>>>>>>>> countDateTime.equals(that.countDateTime) &&
>>>>>>>>>>>>>>                 domain.equals(that.domain) &&
>>>>>>>>>>>>>>                 event.equals(that.event);    }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     @Override    public int hashCode() {
>>>>>>>>>>>>>>         final int prime = 31;        int result = 1;        
>>>>>>>>>>>>>> result = prime * result + countDateTime.hashCode();        
>>>>>>>>>>>>>> result = prime * result + domain.hashCode();        result = 
>>>>>>>>>>>>>> prime * result +  event.hashCode();        return result;    }
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>
>>>>>>>

Reply via email to