Hi, thanks. As previously mentioned, processing time. So I regardless when
the event was generated I want to count all events I have right now (as
soon as they are seen by the flink job).

On Mon, Feb 14, 2022 at 4:16 AM Ali Bahadir Zeybek <a...@ververica.com>
wrote:

> Hello John,
>
> Currently you are grouping the elements two times based on some time
> attribute, one while keying - with event time - and one while windowing -
> with
> processing time. Therefore, the windowing mechanism produces a new window
> computation when you see an element with the same key but arrived later
> from
> the previous window start and end timestamps. Can you please clarify with
> which notion of time you would like to handle the stream of data?
>
> Sincerely,
>
> Ali
>
> On Fri, Feb 11, 2022 at 6:43 PM John Smith <java.dev....@gmail.com> wrote:
>
>> Ok I used the method suggested by Ali. The error is gone. But now I see
>> multiple counts emitted for the same key...
>>
>> DataStream<MyEvent> slStream = env.fromSource(kafkaSource, 
>> WatermarkStrategy.noWatermarks(), "Kafka Source")
>>         .uid(kafkaTopic).name(kafkaTopic)
>>         .setParallelism(kafkaParallelism)
>>         .flatMap(new MapToMyEvent("my-event", windowSizeMins, "message")) 
>> <------ Timestamp in GMT created here rounded to the closest minute down.
>>         .uid("map-json-logs").name("map-json-logs");
>>
>>         slStream.keyBy(new MinutesKeySelector())
>>         
>> .window(TumblingProcessingTimeWindows.of(Time.minutes(windowSizeMins))) 
>> <---- Tumbling window of 1 minute.
>>
>>
>>
>> So below you will see a new count was emitted at 16:51 and 16:55
>>
>> {"countId":"2022-02-11T16:50:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:50:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":3542}
>> -----
>> {"countId":"2022-02-11T16:51:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":16503}
>> {"countId":"2022-02-11T16:51:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":70}
>> -----
>>
>> {"countId":"2022-02-11T16:52:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:52:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":16037}
>> {"countId":"2022-02-11T16:53:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:53:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":18679}
>> {"countId":"2022-02-11T16:54:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:54:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":17697}
>> -----
>>
>> {"countId":"2022-02-11T16:55:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":18066}
>> {"countId":"2022-02-11T16:55:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":58}
>> -----
>> {"countId":"2022-02-11T16:56:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:56:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":17489}
>>
>>
>>
>>
>> On Mon, Feb 7, 2022 at 12:44 PM John Smith <java.dev....@gmail.com>
>> wrote:
>>
>>> Ok I think Ali's solution makes the most sense to me. I'll try it and
>>> let you know.
>>>
>>> On Mon, Feb 7, 2022 at 11:44 AM Jing Ge <j...@ververica.com> wrote:
>>>
>>>> Hi John,
>>>>
>>>> your getKey() implementation shows that it is not deterministic, since
>>>> calling it with the same click instance multiple times will return
>>>> different keys. For example a call at 12:01:59.950 and a call at
>>>> 12:02:00.050 with the same click instance will return two different keys:
>>>>
>>>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name
>>>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name
>>>>
>>>> best regards
>>>> Jing
>>>>
>>>> On Mon, Feb 7, 2022 at 5:07 PM John Smith <java.dev....@gmail.com>
>>>> wrote:
>>>>
>>>>> Maybe there's a misunderstanding. But basically I want to
>>>>> do clickstream count for a given "url" and for simplicity and accuracy of
>>>>> the count base it on processing time (event time doesn't matter as long as
>>>>> I get a total of clicks at that given processing time)
>>>>>
>>>>> So regardless of the event time. I want all clicks for the current
>>>>> processing time rounded to the minute per link.
>>>>>
>>>>> So, if now was 2022-04-07T12:01:00.000Z
>>>>>
>>>>> Then I would want the following result...
>>>>>
>>>>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10
>>>>> 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2
>>>>> 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15
>>>>> ....
>>>>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30
>>>>> 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1
>>>>> 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10
>>>>> And so on...
>>>>>
>>>>> @Override
>>>>> public MyEventCountKey getKey(final MyEvent click) throws Exception
>>>>> {
>>>>> MyEventCountKey key = new MyEventCountKey(
>>>>> Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")),
>>>>> ChronoField.MINUTE_OF_HOUR, windowSizeMins)).toString(),
>>>>> click.getDomain(), // cnn.com
>>>>> click.getPath(), // /some-article-name
>>>>> );
>>>>> return key;
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Feb 7, 2022 at 10:48 AM David Morávek <d...@apache.org> wrote:
>>>>>
>>>>>> The key selector works.
>>>>>>
>>>>>>
>>>>>> No it does not ;) It depends on the system time so it's not
>>>>>> deterministic (you can get different keys for the very same element).
>>>>>>
>>>>>> How do you key a count based on the time. I have taken this from
>>>>>>> samples online.
>>>>>>>
>>>>>>
>>>>>> This is what the windowing is for. You basically want to group /
>>>>>> combine elements per key and event time window [1].
>>>>>>
>>>>>> [1]
>>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/
>>>>>>
>>>>>> Best,
>>>>>> D.
>>>>>>
>>>>>> On Mon, Feb 7, 2022 at 3:44 PM John Smith <java.dev....@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> The key selector works. It only causes an issue if there too many
>>>>>>> keys produced in one shot. For example of 100 "same" keys are produced 
>>>>>>> for
>>>>>>> that 1 minutes it's ok. But if 101 are produced the error happens.
>>>>>>>
>>>>>>>
>>>>>>> If you look at the reproducer at least that's what's hapenning
>>>>>>>
>>>>>>> How do you key a count based on the time. I have taken this from
>>>>>>> samples online.
>>>>>>>
>>>>>>> The key is that particular time for that particular URL path.
>>>>>>>
>>>>>>> So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00
>>>>>>>
>>>>>>> On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, <
>>>>>>> ches...@apache.org> wrote:
>>>>>>>
>>>>>>>> Your Key selector doesn't need to implement hashCode, but given the
>>>>>>>> same object it has to return the same key.
>>>>>>>> In your reproducer the returned key will have different timestamps,
>>>>>>>> and since the timestamp is included in the hashCode, they will be 
>>>>>>>> different
>>>>>>>> each time.
>>>>>>>>
>>>>>>>> On 07/02/2022 14:50, John Smith wrote:
>>>>>>>>
>>>>>>>> I don't get it? I provided the reproducer. I implemented the
>>>>>>>> interface to Key selector it needs hashcode and equals as well?
>>>>>>>>
>>>>>>>> I'm attempting to do click stream. So the key is based on
>>>>>>>> processing date/time rounded to the minute + domain name + path
>>>>>>>>
>>>>>>>> So these should be valid below?
>>>>>>>>
>>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>>>>>>>
>>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article2
>>>>>>>>
>>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article1
>>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article1
>>>>>>>>
>>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article3
>>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article3
>>>>>>>>
>>>>>>>> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, <
>>>>>>>> ches...@apache.org> wrote:
>>>>>>>>
>>>>>>>>> Don't KeySelectors also need to be deterministic?
>>>>>>>>>
>>>>>>>>> * The {@link KeySelector} allows to use deterministic objects for 
>>>>>>>>> operations such as reduce,* reduceGroup, join, coGroup, etc. *If 
>>>>>>>>> invoked multiple times on the same object, the returned key*** must 
>>>>>>>>> be the same.*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 04/02/2022 18:25, John Smith wrote:
>>>>>>>>>
>>>>>>>>> Hi Francesco,  here is the reproducer:
>>>>>>>>> https://github.com/javadevmtl/flink-key-reproducer
>>>>>>>>>
>>>>>>>>> So, essentially it looks like when there's a high influx of
>>>>>>>>> records produced from the source that the Exception is thrown.
>>>>>>>>>
>>>>>>>>> The key is generated by 3 values: date/time rounded to the minute
>>>>>>>>> and 2 strings.
>>>>>>>>> So you will see keys as follows...
>>>>>>>>> 2022-02-04T17:20:00Z|foo|bar
>>>>>>>>> 2022-02-04T17:21:00Z|foo|bar
>>>>>>>>> 2022-02-04T17:22:00Z|foo|bar
>>>>>>>>>
>>>>>>>>> The reproducer has a custom source that basically produces a
>>>>>>>>> record in a loop and sleeps for a specified period of milliseconds 
>>>>>>>>> 100ms in
>>>>>>>>> this case.
>>>>>>>>> The lower the sleep delay the faster records are produced the more
>>>>>>>>> chances the exception is thrown. With a 100ms delay it's always 
>>>>>>>>> thrown.
>>>>>>>>> Setting a 2000 to 3000ms will guarantee it to work.
>>>>>>>>> The original job uses a Kafka Source so it should technically be
>>>>>>>>> able to handle even a couple thousand records per second.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, 3 Feb 2022 at 16:41, John Smith <java.dev....@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Ok it's not my data either. I think it may be a volume issue. I
>>>>>>>>>> have managed to consistently reproduce the error. I'll upload a 
>>>>>>>>>> reproducer
>>>>>>>>>> ASAP.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, 3 Feb 2022 at 15:37, John Smith <java.dev....@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Ok so I tried to create a reproducer but I couldn't reproduce
>>>>>>>>>>> it. But the actual job once in a while throws that error. So I'm 
>>>>>>>>>>> wondering
>>>>>>>>>>> if maybe one of the records that comes in is not valid, though I do
>>>>>>>>>>> validate prior to getting to the key and window operators.
>>>>>>>>>>>
>>>>>>>>>>> On Thu, 3 Feb 2022 at 14:32, John Smith <java.dev....@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Actually maybe not because with PrintSinkFunction it ran for a
>>>>>>>>>>>> bit and then it threw the error.
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, 3 Feb 2022 at 14:24, John Smith <java.dev....@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Ok it may be the ElasticSearch connector causing the issue?
>>>>>>>>>>>>>
>>>>>>>>>>>>> If I use PrintSinkFunction then I get no error and my stats
>>>>>>>>>>>>> print as expected.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani <
>>>>>>>>>>>>> france...@ververica.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>> your hash code and equals seems correct. Can you post a
>>>>>>>>>>>>>> minimum stream pipeline reproducer using this class?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> FG
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith <
>>>>>>>>>>>>>> java.dev....@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi, getting java.lang.IllegalArgumentException: Key group 39
>>>>>>>>>>>>>>> is not in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. 
>>>>>>>>>>>>>>> Unless you're
>>>>>>>>>>>>>>> directly using low level state access APIs, this is most likely 
>>>>>>>>>>>>>>> caused by
>>>>>>>>>>>>>>> non-deterministic shuffle key (hashCode and equals 
>>>>>>>>>>>>>>> implementation).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This is my class, is my hashCode deterministic?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> public final class MyEventCountKey {
>>>>>>>>>>>>>>>     private final String countDateTime;    private final String 
>>>>>>>>>>>>>>> domain;    private final String event;    public 
>>>>>>>>>>>>>>> MyEventCountKey(final String countDateTime, final String 
>>>>>>>>>>>>>>> domain, final String event) {
>>>>>>>>>>>>>>>         this.countDateTime = countDateTime;        this.domain 
>>>>>>>>>>>>>>> = domain;        this.event = event;    }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     public String getCountDateTime() {
>>>>>>>>>>>>>>>         return countDateTime;    }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     public String getDomain() {
>>>>>>>>>>>>>>>         return domain;    }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     public String getEven() {
>>>>>>>>>>>>>>>         return event;    }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     @Override    public String toString() {
>>>>>>>>>>>>>>>         return countDateTime + "|" + domain + "|" + event;    }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     @Override    public boolean equals(Object o) {
>>>>>>>>>>>>>>>         if (this == o) return true;        if (o == null || 
>>>>>>>>>>>>>>> getClass() != o.getClass()) return false;        
>>>>>>>>>>>>>>> MyEventCountKey that = (MyEventCountKey) o;        return 
>>>>>>>>>>>>>>> countDateTime.equals(that.countDateTime) &&
>>>>>>>>>>>>>>>                 domain.equals(that.domain) &&
>>>>>>>>>>>>>>>                 event.equals(that.event);    }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     @Override    public int hashCode() {
>>>>>>>>>>>>>>>         final int prime = 31;        int result = 1;        
>>>>>>>>>>>>>>> result = prime * result + countDateTime.hashCode();        
>>>>>>>>>>>>>>> result = prime * result + domain.hashCode();        result = 
>>>>>>>>>>>>>>> prime * result +  event.hashCode();        return result;    }
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>

Reply via email to