Hi Devin, for event-time based windows, you need to give Flink two types of information: - timestamp of records, which I assume is in your case already embedded into the Pulsar records - and a watermark assigner.
The watermarks help Flink to determine when windows can be closed in respect to out-of-order and late events. This is highly usecase-specific and cannot usually be inferred automatically. So you need to specify a watermark assigner for event time windows to work. Pulsar offers a similar API to Kafka, so that you can simply refer to the respective documentation [1]. The other sections of this page give you a more general overview of the options, which may be interesting for future use cases where you want to aggregate event time-based records. Best, Arvid [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition On Tue, Dec 10, 2019 at 9:45 PM Devin Bost <devin.b...@gmail.com> wrote: > I did confirm that I got no resulting output after 20 seconds and after > sending additional data after waiting over a minute between batches of > data. > > My code looks like this: > > PulsarSourceBuilder<String> builder = PulsarSourceBuilder > .builder(new SimpleStringSchema()) > .serviceUrl(SERVICE_URL) > .topic(INPUT_TOPIC) > .subscriptionName(SUBSCRIPTION_NAME); > SourceFunction<String> src = builder.build(); > DataStream<String> dataStream = env.addSource(src); > > DataStream<String> combinedEnvelopes = dataStream > .map(new MapFunction<String, Tuple2<String, String>>() { > @Override > public Tuple2 map(String incomingMessage) throws Exception { > return mapToTuple(incomingMessage); > } > }) > .keyBy(0) > //.timeWindow(Time.seconds(5)) > .window(EventTimeSessionWindows.withGap(Time.seconds(5))) > .aggregate(new JsonConcatenator()); > //dataStream.print(); > > Logger logger = LoggerFactory.getLogger(SplinklerJob.class); > logger.info("Ran dataStream. Adding sink next"); > combinedEnvelopes.addSink(new FlinkPulsarProducer<>( > SERVICE_URL, > OUTPUT_TOPIC, > new AuthenticationDisabled(), // probably need to fix // > AuthenticationTls() > combinedData -> combinedData.toString().getBytes(UTF_8), > combinedData -> "test") > ); > logger.info("Added sink. Executing job."); > // execute program > env.execute("Flink Streaming Java API Skeleton"); > > > Here is the JsonConcatenator class: > > private static class JsonConcatenator > implements AggregateFunction<Tuple2<String, String>, Tuple2<String, > String>, String> { > Logger logger = LoggerFactory.getLogger(SplinklerJob.class); > @Override > public Tuple2<String, String> createAccumulator() { > return new Tuple2<String, String>("",""); > } > > @Override > public Tuple2<String, String> add(Tuple2<String, String> value, > Tuple2<String, String> accumulator) { > logger.info("Running Add on value.f0: " + value.f0 + " and value.f1: " > + value.f1); > return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1); > } > > @Override > public String getResult(Tuple2<String, String> accumulator) { > logger.info("Running getResult on accumulator.f1: " + accumulator.f1); > return "[" + accumulator.f1.substring(1) + "]"; > } > > @Override > public Tuple2<String, String> merge(Tuple2<String, String> a, > Tuple2<String, String> b) { > // Merge is applied when you allow lateness. > logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " + a.f1 + > " and b.f1: " + b.f1); > if(b.f1.charAt(0) == '['){ > logger.info("During merge, we detected the right message starts with > the '[' character. Removing it."); > b.f1 = b.f1.substring(1); > } > return new Tuple2<>(a.f0, a.f1 + ", " + b.f1); > } > } > > > Devin G. Bost > > Re: > > getResult will only be called when the window is triggered. For a >> fixed-time window, it triggers at the end of the window. >> However, for EventTimeSessionWindows you need to have gaps in the data. >> Can you verify that there is actually a 20sec pause inbetween data points >> for your keys? >> Additionally, it may also be an issue with extracting the event time from >> the sources. Could you post the relevant code as well? >> Best, >> Arvid > > > On Tue, Dec 10, 2019 at 3:22 AM Arvid Heise <ar...@ververica.com> wrote: > >> getResult will only be called when the window is triggered. For a >> fixed-time window, it triggers at the end of the window. >> >> However, for EventTimeSessionWindows you need to have gaps in the data. >> Can you verify that there is actually a 20sec pause inbetween data points >> for your keys? >> Additionally, it may also be an issue with extracting the event time from >> the sources. Could you post the relevant code as well? >> >> Best, >> >> Arvid >> >> On Mon, Dec 9, 2019 at 8:51 AM vino yang <yanghua1...@gmail.com> wrote: >> >>> Hi dev, >>> >>> The time of the window may have different semantics. >>> In the session window, it's only a time gap, the size of the window is >>> driven via activity events. >>> In the tumbling or sliding window, it means the size of the window. >>> >>> For more details, please see the official documentation.[1] >>> >>> Best, >>> Vino >>> >>> [1]: >>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#session-windows >>> >>> >>> >>> devinbost <devin.b...@gmail.com> 于2019年12月6日周五 下午10:39写道: >>> >>>> I think there might be a bug in >>>> `.window(EventTimeSessionWindows.withGap(Time.seconds(5)))` >>>> (unless I'm just not using it correctly) because I'm able to get output >>>> when I use the simpler window >>>> `.timeWindow(Time.seconds(5))` >>>> However, I don't get any output when I used the session-based window. >>>> >>>> >>>> devinbost wrote >>>> > I added logging statements everywhere in my code, and I'm able to see >>>> my >>>> > message reach the `add` method in the AggregateFunction that I >>>> > implemented, >>>> > but the getResult method is never called. >>>> > >>>> > In the code below, I also never see the: >>>> > "Ran dataStream. Adding sink next" >>>> > line appear in my log, and the only log statements from the >>>> > JsonConcatenator >>>> > class come from the `add` method, as shown below. >>>> > >>>> > >>>> > DataStream >>>> > <String> >>>> > combinedEnvelopes = dataStream >>>> > .map(new MapFunction<String, Tuple2&lt;String, >>>> String>>() { >>>> > @Override >>>> > public Tuple2 map(String incomingMessage) throws Exception { >>>> > return mapToTuple(incomingMessage); >>>> > } >>>> > }) >>>> > .keyBy(0) >>>> > .window(EventTimeSessionWindows.withGap(Time.seconds(20))) >>>> > .aggregate(new JsonConcatenator()); >>>> > >>>> > Logger logger = LoggerFactory.getLogger(StreamJob.class); >>>> > logger.info("Ran dataStream. Adding sink next") >>>> > >>>> > ------------- >>>> > >>>> > private static class JsonConcatenator >>>> > implements AggregateFunction<Tuple2&lt;String, >>>> String>, >>>> > Tuple2<String, String>, String> { >>>> > Logger logger = LoggerFactory.getLogger(SplinklerJob.class); >>>> > @Override >>>> > public Tuple2<String, String> createAccumulator() { >>>> > return new Tuple2<String, String>("",""); >>>> > } >>>> > >>>> > @Override >>>> > public Tuple2<String, String> add(Tuple2<String, >>>> String> >>>> > value, >>>> > Tuple2<String, String> accumulator) { >>>> > logger.info("Running Add on value.f0: " + value.f0 + " and >>>> > value.f1: >>>> > " + value.f1); >>>> > return new Tuple2<>(value.f0, accumulator.f1 + ", " + >>>> value.f1); >>>> > } >>>> > >>>> > @Override >>>> > public String getResult(Tuple2<String, String> accumulator) >>>> { >>>> > logger.info("Running getResult on accumulator.f1: " + >>>> > accumulator.f1); >>>> > return "[" + accumulator.f1 + "]"; >>>> > } >>>> > >>>> > @Override >>>> > public Tuple2<String, String> merge(Tuple2<String, >>>> String> >>>> > a, >>>> > Tuple2<String, String> b) { >>>> > logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: >>>> " + >>>> > a.f1 >>>> > + " and b.f1: " + b.f1); >>>> > return new Tuple2<>(a.f0, a.f1 + ", " + b.f1); >>>> > } >>>> > } >>>> > >>>> > >>>> > >>>> > >>>> > Any ideas? >>>> > >>>> > >>>> > Chris Miller-2 wrote >>>> >> I hit the same problem, as far as I can tell it should be fixed in >>>> >> Pulsar 2.4.2. The release of this has already passed voting so I >>>> hope it >>>> >> should be available in a day or two. >>>> >> >>>> >> https://github.com/apache/pulsar/pull/5068 >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > -- >>>> > Sent from: >>>> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>> >>>> >>>> >>>> >>>> >>>> -- >>>> Sent from: >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>> >>>