Hello John,

That is what exactly the window operator does for you. Can you please check
the
documentation[1] and let us know what part of the window operator alone does
not suffice for the use case?

Sincerely,

Ali

[1]:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows

On Mon, Feb 14, 2022 at 4:03 PM John Smith <java.dev....@gmail.com> wrote:

> Because I want to group them for the last X minutes. In this case last 1
> minute.
>
> On Mon, Feb 14, 2022 at 10:01 AM Ali Bahadir Zeybek <a...@ververica.com>
> wrote:
>
>> Hello John,
>>
>> Then may I ask you why you need to use a time attribute as part of your
>> key?
>> Why not just key by the fields like `mydomain.com` and `some-article` in
>> your
>> example and use only window operator for grouping elements based on time?
>>
>> Sincerely,
>>
>> Ali
>>
>> On Mon, Feb 14, 2022 at 3:55 PM John Smith <java.dev....@gmail.com>
>> wrote:
>>
>>> Hi, thanks. As previously mentioned, processing time. So I
>>> regardless when the event was generated I want to count all events I have
>>> right now (as soon as they are seen by the flink job).
>>>
>>> On Mon, Feb 14, 2022 at 4:16 AM Ali Bahadir Zeybek <a...@ververica.com>
>>> wrote:
>>>
>>>> Hello John,
>>>>
>>>> Currently you are grouping the elements two times based on some time
>>>> attribute, one while keying - with event time - and one while windowing
>>>> - with
>>>> processing time. Therefore, the windowing mechanism produces a new
>>>> window
>>>> computation when you see an element with the same key but arrived later
>>>> from
>>>> the previous window start and end timestamps. Can you please clarify
>>>> with
>>>> which notion of time you would like to handle the stream of data?
>>>>
>>>> Sincerely,
>>>>
>>>> Ali
>>>>
>>>> On Fri, Feb 11, 2022 at 6:43 PM John Smith <java.dev....@gmail.com>
>>>> wrote:
>>>>
>>>>> Ok I used the method suggested by Ali. The error is gone. But now I
>>>>> see multiple counts emitted for the same key...
>>>>>
>>>>> DataStream<MyEvent> slStream = env.fromSource(kafkaSource, 
>>>>> WatermarkStrategy.noWatermarks(), "Kafka Source")
>>>>>         .uid(kafkaTopic).name(kafkaTopic)
>>>>>         .setParallelism(kafkaParallelism)
>>>>>         .flatMap(new MapToMyEvent("my-event", windowSizeMins, "message")) 
>>>>> <------ Timestamp in GMT created here rounded to the closest minute down.
>>>>>         .uid("map-json-logs").name("map-json-logs");
>>>>>
>>>>>         slStream.keyBy(new MinutesKeySelector())
>>>>>         
>>>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(windowSizeMins))) 
>>>>> <---- Tumbling window of 1 minute.
>>>>>
>>>>>
>>>>>
>>>>> So below you will see a new count was emitted at 16:51 and 16:55
>>>>>
>>>>> {"countId":"2022-02-11T16:50:00Z|mydomain.com
>>>>> |/some-article","countDateTime":"2022-02-11T16:50:00Z","domain":"
>>>>> mydomain.com","uri":"/some-article","count":3542}
>>>>> -----
>>>>> {"countId":"2022-02-11T16:51:00Z|mydomain.com
>>>>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
>>>>> mydomain.com","uri":"/some-article","count":16503}
>>>>> {"countId":"2022-02-11T16:51:00Z|mydomain.com
>>>>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
>>>>> mydomain.com","uri":"/some-article","count":70}
>>>>> -----
>>>>>
>>>>> {"countId":"2022-02-11T16:52:00Z|mydomain.com
>>>>> |/some-article","countDateTime":"2022-02-11T16:52:00Z","domain":"
>>>>> mydomain.com","uri":"/some-article","count":16037}
>>>>> {"countId":"2022-02-11T16:53:00Z|mydomain.com
>>>>> |/some-article","countDateTime":"2022-02-11T16:53:00Z","domain":"
>>>>> mydomain.com","uri":"/some-article","count":18679}
>>>>> {"countId":"2022-02-11T16:54:00Z|mydomain.com
>>>>> |/some-article","countDateTime":"2022-02-11T16:54:00Z","domain":"
>>>>> mydomain.com","uri":"/some-article","count":17697}
>>>>> -----
>>>>>
>>>>> {"countId":"2022-02-11T16:55:00Z|mydomain.com
>>>>> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
>>>>> mydomain.com","uri":"/some-article","count":18066}
>>>>> {"countId":"2022-02-11T16:55:00Z|mydomain.com
>>>>> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
>>>>> mydomain.com","uri":"/some-article","count":58}
>>>>> -----
>>>>> {"countId":"2022-02-11T16:56:00Z|mydomain.com
>>>>> |/some-article","countDateTime":"2022-02-11T16:56:00Z","domain":"
>>>>> mydomain.com","uri":"/some-article","count":17489}
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Feb 7, 2022 at 12:44 PM John Smith <java.dev....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Ok I think Ali's solution makes the most sense to me. I'll try it and
>>>>>> let you know.
>>>>>>
>>>>>> On Mon, Feb 7, 2022 at 11:44 AM Jing Ge <j...@ververica.com> wrote:
>>>>>>
>>>>>>> Hi John,
>>>>>>>
>>>>>>> your getKey() implementation shows that it is not deterministic,
>>>>>>> since calling it with the same click instance multiple times will return
>>>>>>> different keys. For example a call at 12:01:59.950 and a call at
>>>>>>> 12:02:00.050 with the same click instance will return two different 
>>>>>>> keys:
>>>>>>>
>>>>>>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name
>>>>>>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name
>>>>>>>
>>>>>>> best regards
>>>>>>> Jing
>>>>>>>
>>>>>>> On Mon, Feb 7, 2022 at 5:07 PM John Smith <java.dev....@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Maybe there's a misunderstanding. But basically I want to
>>>>>>>> do clickstream count for a given "url" and for simplicity and accuracy 
>>>>>>>> of
>>>>>>>> the count base it on processing time (event time doesn't matter as 
>>>>>>>> long as
>>>>>>>> I get a total of clicks at that given processing time)
>>>>>>>>
>>>>>>>> So regardless of the event time. I want all clicks for the current
>>>>>>>> processing time rounded to the minute per link.
>>>>>>>>
>>>>>>>> So, if now was 2022-04-07T12:01:00.000Z
>>>>>>>>
>>>>>>>> Then I would want the following result...
>>>>>>>>
>>>>>>>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10
>>>>>>>> 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2
>>>>>>>> 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15
>>>>>>>> ....
>>>>>>>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30
>>>>>>>> 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1
>>>>>>>> 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10
>>>>>>>> And so on...
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public MyEventCountKey getKey(final MyEvent click) throws Exception
>>>>>>>> {
>>>>>>>> MyEventCountKey key = new MyEventCountKey(
>>>>>>>> Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")),
>>>>>>>> ChronoField.MINUTE_OF_HOUR, windowSizeMins)).toString(),
>>>>>>>> click.getDomain(), // cnn.com
>>>>>>>> click.getPath(), // /some-article-name
>>>>>>>> );
>>>>>>>> return key;
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Feb 7, 2022 at 10:48 AM David Morávek <d...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> The key selector works.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> No it does not ;) It depends on the system time so it's not
>>>>>>>>> deterministic (you can get different keys for the very same element).
>>>>>>>>>
>>>>>>>>> How do you key a count based on the time. I have taken this from
>>>>>>>>>> samples online.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This is what the windowing is for. You basically want to group /
>>>>>>>>> combine elements per key and event time window [1].
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> D.
>>>>>>>>>
>>>>>>>>> On Mon, Feb 7, 2022 at 3:44 PM John Smith <java.dev....@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> The key selector works. It only causes an issue if there too many
>>>>>>>>>> keys produced in one shot. For example of 100 "same" keys are 
>>>>>>>>>> produced for
>>>>>>>>>> that 1 minutes it's ok. But if 101 are produced the error happens.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> If you look at the reproducer at least that's what's hapenning
>>>>>>>>>>
>>>>>>>>>> How do you key a count based on the time. I have taken this from
>>>>>>>>>> samples online.
>>>>>>>>>>
>>>>>>>>>> The key is that particular time for that particular URL path.
>>>>>>>>>>
>>>>>>>>>> So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00
>>>>>>>>>>
>>>>>>>>>> On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, <
>>>>>>>>>> ches...@apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> Your Key selector doesn't need to implement hashCode, but given
>>>>>>>>>>> the same object it has to return the same key.
>>>>>>>>>>> In your reproducer the returned key will have different
>>>>>>>>>>> timestamps, and since the timestamp is included in the hashCode, 
>>>>>>>>>>> they will
>>>>>>>>>>> be different each time.
>>>>>>>>>>>
>>>>>>>>>>> On 07/02/2022 14:50, John Smith wrote:
>>>>>>>>>>>
>>>>>>>>>>> I don't get it? I provided the reproducer. I implemented the
>>>>>>>>>>> interface to Key selector it needs hashcode and equals as well?
>>>>>>>>>>>
>>>>>>>>>>> I'm attempting to do click stream. So the key is based on
>>>>>>>>>>> processing date/time rounded to the minute + domain name + path
>>>>>>>>>>>
>>>>>>>>>>> So these should be valid below?
>>>>>>>>>>>
>>>>>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>>>>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>>>>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>>>>>>>>>>
>>>>>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article2
>>>>>>>>>>>
>>>>>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article1
>>>>>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article1
>>>>>>>>>>>
>>>>>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article3
>>>>>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article3
>>>>>>>>>>>
>>>>>>>>>>> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, <
>>>>>>>>>>> ches...@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Don't KeySelectors also need to be deterministic?
>>>>>>>>>>>>
>>>>>>>>>>>> * The {@link KeySelector} allows to use deterministic objects for 
>>>>>>>>>>>> operations such as reduce,* reduceGroup, join, coGroup, etc. *If 
>>>>>>>>>>>> invoked multiple times on the same object, the returned key*** 
>>>>>>>>>>>> must be the same.*
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 04/02/2022 18:25, John Smith wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi Francesco,  here is the reproducer:
>>>>>>>>>>>> https://github.com/javadevmtl/flink-key-reproducer
>>>>>>>>>>>>
>>>>>>>>>>>> So, essentially it looks like when there's a high influx of
>>>>>>>>>>>> records produced from the source that the Exception is thrown.
>>>>>>>>>>>>
>>>>>>>>>>>> The key is generated by 3 values: date/time rounded to the
>>>>>>>>>>>> minute and 2 strings.
>>>>>>>>>>>> So you will see keys as follows...
>>>>>>>>>>>> 2022-02-04T17:20:00Z|foo|bar
>>>>>>>>>>>> 2022-02-04T17:21:00Z|foo|bar
>>>>>>>>>>>> 2022-02-04T17:22:00Z|foo|bar
>>>>>>>>>>>>
>>>>>>>>>>>> The reproducer has a custom source that basically produces a
>>>>>>>>>>>> record in a loop and sleeps for a specified period of milliseconds 
>>>>>>>>>>>> 100ms in
>>>>>>>>>>>> this case.
>>>>>>>>>>>> The lower the sleep delay the faster records are produced the
>>>>>>>>>>>> more chances the exception is thrown. With a 100ms delay it's 
>>>>>>>>>>>> always
>>>>>>>>>>>> thrown. Setting a 2000 to 3000ms will guarantee it to work.
>>>>>>>>>>>> The original job uses a Kafka Source so it should technically
>>>>>>>>>>>> be able to handle even a couple thousand records per second.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, 3 Feb 2022 at 16:41, John Smith <java.dev....@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Ok it's not my data either. I think it may be a volume issue.
>>>>>>>>>>>>> I have managed to consistently reproduce the error. I'll upload a
>>>>>>>>>>>>> reproducer ASAP.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, 3 Feb 2022 at 15:37, John Smith <
>>>>>>>>>>>>> java.dev....@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Ok so I tried to create a reproducer but I couldn't reproduce
>>>>>>>>>>>>>> it. But the actual job once in a while throws that error. So I'm 
>>>>>>>>>>>>>> wondering
>>>>>>>>>>>>>> if maybe one of the records that comes in is not valid, though I 
>>>>>>>>>>>>>> do
>>>>>>>>>>>>>> validate prior to getting to the key and window operators.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, 3 Feb 2022 at 14:32, John Smith <
>>>>>>>>>>>>>> java.dev....@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Actually maybe not because with PrintSinkFunction it ran for
>>>>>>>>>>>>>>> a bit and then it threw the error.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, 3 Feb 2022 at 14:24, John Smith <
>>>>>>>>>>>>>>> java.dev....@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Ok it may be the ElasticSearch connector causing the issue?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> If I use PrintSinkFunction then I get no error and my stats
>>>>>>>>>>>>>>>> print as expected.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani <
>>>>>>>>>>>>>>>> france...@ververica.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>> your hash code and equals seems correct. Can you post a
>>>>>>>>>>>>>>>>> minimum stream pipeline reproducer using this class?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> FG
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith <
>>>>>>>>>>>>>>>>> java.dev....@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi, getting java.lang.IllegalArgumentException: Key group
>>>>>>>>>>>>>>>>>> 39 is not in KeyGroupRange{startKeyGroup=96, 
>>>>>>>>>>>>>>>>>> endKeyGroup=103}. Unless
>>>>>>>>>>>>>>>>>> you're directly using low level state access APIs, this is 
>>>>>>>>>>>>>>>>>> most likely
>>>>>>>>>>>>>>>>>> caused by non-deterministic shuffle key (hashCode and equals
>>>>>>>>>>>>>>>>>> implementation).
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> This is my class, is my hashCode deterministic?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> public final class MyEventCountKey {
>>>>>>>>>>>>>>>>>>     private final String countDateTime;    private final 
>>>>>>>>>>>>>>>>>> String domain;    private final String event;    public 
>>>>>>>>>>>>>>>>>> MyEventCountKey(final String countDateTime, final String 
>>>>>>>>>>>>>>>>>> domain, final String event) {
>>>>>>>>>>>>>>>>>>         this.countDateTime = countDateTime;        
>>>>>>>>>>>>>>>>>> this.domain = domain;        this.event = event;    }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>     public String getCountDateTime() {
>>>>>>>>>>>>>>>>>>         return countDateTime;    }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>     public String getDomain() {
>>>>>>>>>>>>>>>>>>         return domain;    }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>     public String getEven() {
>>>>>>>>>>>>>>>>>>         return event;    }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>     @Override    public String toString() {
>>>>>>>>>>>>>>>>>>         return countDateTime + "|" + domain + "|" + event;   
>>>>>>>>>>>>>>>>>>  }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>     @Override    public boolean equals(Object o) {
>>>>>>>>>>>>>>>>>>         if (this == o) return true;        if (o == null || 
>>>>>>>>>>>>>>>>>> getClass() != o.getClass()) return false;        
>>>>>>>>>>>>>>>>>> MyEventCountKey that = (MyEventCountKey) o;        return 
>>>>>>>>>>>>>>>>>> countDateTime.equals(that.countDateTime) &&
>>>>>>>>>>>>>>>>>>                 domain.equals(that.domain) &&
>>>>>>>>>>>>>>>>>>                 event.equals(that.event);    }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>     @Override    public int hashCode() {
>>>>>>>>>>>>>>>>>>         final int prime = 31;        int result = 1;        
>>>>>>>>>>>>>>>>>> result = prime * result + countDateTime.hashCode();        
>>>>>>>>>>>>>>>>>> result = prime * result + domain.hashCode();        result = 
>>>>>>>>>>>>>>>>>> prime * result +  event.hashCode();        return result;    
>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to