Maybe there's a misunderstanding. But basically I want to do clickstream count for a given "url" and for simplicity and accuracy of the count base it on processing time (event time doesn't matter as long as I get a total of clicks at that given processing time)
So regardless of the event time. I want all clicks for the current processing time rounded to the minute per link. So, if now was 2022-04-07T12:01:00.000Z Then I would want the following result... 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15 .... 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10 And so on... @Override public MyEventCountKey getKey(final MyEvent click) throws Exception { MyEventCountKey key = new MyEventCountKey( Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")), ChronoField. MINUTE_OF_HOUR, windowSizeMins)).toString(), click.getDomain(), // cnn.com click.getPath(), // /some-article-name ); return key; } On Mon, Feb 7, 2022 at 10:48 AM David Morávek <d...@apache.org> wrote: > The key selector works. > > > No it does not ;) It depends on the system time so it's not deterministic > (you can get different keys for the very same element). > > How do you key a count based on the time. I have taken this from samples >> online. >> > > This is what the windowing is for. You basically want to group / combine > elements per key and event time window [1]. > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/ > > Best, > D. > > On Mon, Feb 7, 2022 at 3:44 PM John Smith <java.dev....@gmail.com> wrote: > >> The key selector works. It only causes an issue if there too many keys >> produced in one shot. For example of 100 "same" keys are produced for that >> 1 minutes it's ok. But if 101 are produced the error happens. >> >> >> If you look at the reproducer at least that's what's hapenning >> >> How do you key a count based on the time. I have taken this from samples >> online. >> >> The key is that particular time for that particular URL path. >> >> So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00 >> >> On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, <ches...@apache.org> >> wrote: >> >>> Your Key selector doesn't need to implement hashCode, but given the same >>> object it has to return the same key. >>> In your reproducer the returned key will have different timestamps, and >>> since the timestamp is included in the hashCode, they will be different >>> each time. >>> >>> On 07/02/2022 14:50, John Smith wrote: >>> >>> I don't get it? I provided the reproducer. I implemented the interface >>> to Key selector it needs hashcode and equals as well? >>> >>> I'm attempting to do click stream. So the key is based on processing >>> date/time rounded to the minute + domain name + path >>> >>> So these should be valid below? >>> >>> 2022-01-01T10:02:00 + cnn.com + /article1 >>> 2022-01-01T10:02:00 + cnn.com + /article1 >>> 2022-01-01T10:02:00 + cnn.com + /article1 >>> >>> 2022-01-01T10:02:00 + cnn.com + /article2 >>> >>> 2022-01-01T10:03:00 + cnn.com + /article1 >>> 2022-01-01T10:03:00 + cnn.com + /article1 >>> >>> 2022-01-01T10:03:00 + cnn.com + /article3 >>> 2022-01-01T10:03:00 + cnn.com + /article3 >>> >>> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, <ches...@apache.org> >>> wrote: >>> >>>> Don't KeySelectors also need to be deterministic? >>>> >>>> * The {@link KeySelector} allows to use deterministic objects for >>>> operations such as reduce,* reduceGroup, join, coGroup, etc. *If invoked >>>> multiple times on the same object, the returned key*** must be the same.* >>>> >>>> >>>> On 04/02/2022 18:25, John Smith wrote: >>>> >>>> Hi Francesco, here is the reproducer: >>>> https://github.com/javadevmtl/flink-key-reproducer >>>> >>>> So, essentially it looks like when there's a high influx of records >>>> produced from the source that the Exception is thrown. >>>> >>>> The key is generated by 3 values: date/time rounded to the minute and 2 >>>> strings. >>>> So you will see keys as follows... >>>> 2022-02-04T17:20:00Z|foo|bar >>>> 2022-02-04T17:21:00Z|foo|bar >>>> 2022-02-04T17:22:00Z|foo|bar >>>> >>>> The reproducer has a custom source that basically produces a record in >>>> a loop and sleeps for a specified period of milliseconds 100ms in this >>>> case. >>>> The lower the sleep delay the faster records are produced the more >>>> chances the exception is thrown. With a 100ms delay it's always thrown. >>>> Setting a 2000 to 3000ms will guarantee it to work. >>>> The original job uses a Kafka Source so it should technically be able >>>> to handle even a couple thousand records per second. >>>> >>>> >>>> On Thu, 3 Feb 2022 at 16:41, John Smith <java.dev....@gmail.com> wrote: >>>> >>>>> Ok it's not my data either. I think it may be a volume issue. I have >>>>> managed to consistently reproduce the error. I'll upload a reproducer >>>>> ASAP. >>>>> >>>>> >>>>> >>>>> On Thu, 3 Feb 2022 at 15:37, John Smith <java.dev....@gmail.com> >>>>> wrote: >>>>> >>>>>> Ok so I tried to create a reproducer but I couldn't reproduce it. But >>>>>> the actual job once in a while throws that error. So I'm wondering if >>>>>> maybe >>>>>> one of the records that comes in is not valid, though I do validate prior >>>>>> to getting to the key and window operators. >>>>>> >>>>>> On Thu, 3 Feb 2022 at 14:32, John Smith <java.dev....@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Actually maybe not because with PrintSinkFunction it ran for a bit >>>>>>> and then it threw the error. >>>>>>> >>>>>>> On Thu, 3 Feb 2022 at 14:24, John Smith <java.dev....@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Ok it may be the ElasticSearch connector causing the issue? >>>>>>>> >>>>>>>> If I use PrintSinkFunction then I get no error and my stats print >>>>>>>> as expected. >>>>>>>> >>>>>>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani < >>>>>>>> france...@ververica.com> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> your hash code and equals seems correct. Can you post a minimum >>>>>>>>> stream pipeline reproducer using this class? >>>>>>>>> >>>>>>>>> FG >>>>>>>>> >>>>>>>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith <java.dev....@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, getting java.lang.IllegalArgumentException: Key group 39 is >>>>>>>>>> not in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless >>>>>>>>>> you're >>>>>>>>>> directly using low level state access APIs, this is most likely >>>>>>>>>> caused by >>>>>>>>>> non-deterministic shuffle key (hashCode and equals implementation). >>>>>>>>>> >>>>>>>>>> This is my class, is my hashCode deterministic? >>>>>>>>>> >>>>>>>>>> public final class MyEventCountKey { >>>>>>>>>> private final String countDateTime; private final String >>>>>>>>>> domain; private final String event; public >>>>>>>>>> MyEventCountKey(final String countDateTime, final String domain, >>>>>>>>>> final String event) { >>>>>>>>>> this.countDateTime = countDateTime; this.domain = >>>>>>>>>> domain; this.event = event; } >>>>>>>>>> >>>>>>>>>> public String getCountDateTime() { >>>>>>>>>> return countDateTime; } >>>>>>>>>> >>>>>>>>>> public String getDomain() { >>>>>>>>>> return domain; } >>>>>>>>>> >>>>>>>>>> public String getEven() { >>>>>>>>>> return event; } >>>>>>>>>> >>>>>>>>>> @Override public String toString() { >>>>>>>>>> return countDateTime + "|" + domain + "|" + event; } >>>>>>>>>> >>>>>>>>>> @Override public boolean equals(Object o) { >>>>>>>>>> if (this == o) return true; if (o == null || >>>>>>>>>> getClass() != o.getClass()) return false; MyEventCountKey >>>>>>>>>> that = (MyEventCountKey) o; return >>>>>>>>>> countDateTime.equals(that.countDateTime) && >>>>>>>>>> domain.equals(that.domain) && >>>>>>>>>> event.equals(that.event); } >>>>>>>>>> >>>>>>>>>> @Override public int hashCode() { >>>>>>>>>> final int prime = 31; int result = 1; result = >>>>>>>>>> prime * result + countDateTime.hashCode(); result = prime * >>>>>>>>>> result + domain.hashCode(); result = prime * result + >>>>>>>>>> event.hashCode(); return result; } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> >>>> >>>