Hi Devin,

for event-time based windows, you need to give Flink two types of
information:
- timestamp of records, which I assume is in your case already embedded
into the Pulsar records
- and a watermark assigner.

The watermarks help Flink to determine when windows can be closed in
respect to out-of-order and late events. This is highly usecase-specific
and cannot usually be inferred automatically. So you need to specify a
watermark assigner for event time windows to work. Pulsar offers a similar
API to Kafka, so that you can simply refer to the respective documentation
[1]. The other sections of this page give you a more general overview of
the options, which may be interesting for future use cases where you want
to aggregate event time-based records.

Best,

Arvid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition

On Tue, Dec 10, 2019 at 9:45 PM Devin Bost <devin.b...@gmail.com> wrote:

> I did confirm that I got no resulting output after 20 seconds and after
> sending additional data after waiting over a minute between batches of
> data.
>
> My code looks like this:
>
> PulsarSourceBuilder<String> builder = PulsarSourceBuilder
>       .builder(new SimpleStringSchema())
>       .serviceUrl(SERVICE_URL)
>       .topic(INPUT_TOPIC)
>       .subscriptionName(SUBSCRIPTION_NAME);
> SourceFunction<String> src = builder.build();
> DataStream<String> dataStream = env.addSource(src);
>
> DataStream<String> combinedEnvelopes = dataStream
>       .map(new MapFunction<String, Tuple2<String, String>>() {
>          @Override
>          public Tuple2 map(String incomingMessage) throws Exception {
>             return mapToTuple(incomingMessage);
>          }
>       })
>       .keyBy(0)
>       //.timeWindow(Time.seconds(5))
>       .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>       .aggregate(new JsonConcatenator());
> //dataStream.print();
>
> Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
> logger.info("Ran dataStream. Adding sink next");
> combinedEnvelopes.addSink(new FlinkPulsarProducer<>(
>       SERVICE_URL,
>       OUTPUT_TOPIC,
>       new AuthenticationDisabled(), // probably need to fix //  
> AuthenticationTls()
>       combinedData -> combinedData.toString().getBytes(UTF_8),
>       combinedData -> "test")
> );
> logger.info("Added sink. Executing job.");
> // execute program
> env.execute("Flink Streaming Java API Skeleton");
>
>
> Here is the JsonConcatenator class:
>
> private static class JsonConcatenator
>       implements AggregateFunction<Tuple2<String, String>, Tuple2<String, 
> String>, String> {
>    Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
>    @Override
>    public Tuple2<String, String> createAccumulator() {
>       return new Tuple2<String, String>("","");
>    }
>
>    @Override
>    public Tuple2<String, String> add(Tuple2<String, String> value, 
> Tuple2<String, String> accumulator) {
>       logger.info("Running Add on value.f0: " + value.f0 + " and value.f1: " 
> + value.f1);
>       return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
>    }
>
>    @Override
>    public String getResult(Tuple2<String, String> accumulator) {
>       logger.info("Running getResult on accumulator.f1: " + accumulator.f1);
>       return "[" + accumulator.f1.substring(1) + "]";
>    }
>
>    @Override
>    public Tuple2<String, String> merge(Tuple2<String, String> a, 
> Tuple2<String, String> b) {
>       // Merge is applied when you allow lateness.
>       logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " + a.f1 + 
> " and b.f1: " + b.f1);
>       if(b.f1.charAt(0) == '['){
>          logger.info("During merge, we detected the right message starts with 
> the '[' character. Removing it.");
>          b.f1 = b.f1.substring(1);
>       }
>       return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
>    }
> }
>
>
> Devin G. Bost
>
> Re:
>
> getResult will only be called when the window is triggered. For a
>> fixed-time window, it triggers at the end of the window.
>> However, for EventTimeSessionWindows you need to have gaps in the data.
>> Can you verify that there is actually a 20sec pause inbetween data points
>> for your keys?
>> Additionally, it may also be an issue with extracting the event time from
>> the sources. Could you post the relevant code as well?
>> Best,
>> Arvid
>
>
> On Tue, Dec 10, 2019 at 3:22 AM Arvid Heise <ar...@ververica.com> wrote:
>
>> getResult will only be called when the window is triggered. For a
>> fixed-time window, it triggers at the end of the window.
>>
>> However, for EventTimeSessionWindows you need to have gaps in the data.
>> Can you verify that there is actually a 20sec pause inbetween data points
>> for your keys?
>> Additionally, it may also be an issue with extracting the event time from
>> the sources. Could you post the relevant code as well?
>>
>> Best,
>>
>> Arvid
>>
>> On Mon, Dec 9, 2019 at 8:51 AM vino yang <yanghua1...@gmail.com> wrote:
>>
>>> Hi dev,
>>>
>>> The time of the window may have different semantics.
>>> In the session window, it's only a time gap, the size of the window is
>>> driven via activity events.
>>> In the tumbling or sliding window, it means the size of the window.
>>>
>>> For more details, please see the official documentation.[1]
>>>
>>> Best,
>>> Vino
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#session-windows
>>>
>>>
>>>
>>> devinbost <devin.b...@gmail.com> 于2019年12月6日周五 下午10:39写道:
>>>
>>>> I think there might be a bug in
>>>> `.window(EventTimeSessionWindows.withGap(Time.seconds(5)))`
>>>>  (unless I'm just not using it correctly) because I'm able to get output
>>>> when I use the simpler window
>>>> `.timeWindow(Time.seconds(5))`
>>>> However, I don't get any output when I used the session-based window.
>>>>
>>>>
>>>> devinbost wrote
>>>> > I added logging statements everywhere in my code, and I'm able to see
>>>> my
>>>> > message reach the `add` method in the AggregateFunction that I
>>>> > implemented,
>>>> > but the getResult method is never called.
>>>> >
>>>> > In the code below, I also never see the:
>>>> >  "Ran dataStream. Adding sink next"
>>>> > line appear in my log, and the only log statements from the
>>>> > JsonConcatenator
>>>> > class come from the `add` method, as shown below.
>>>> >
>>>> >
>>>> > DataStream
>>>> > <String>
>>>> >  combinedEnvelopes = dataStream
>>>> >     .map(new MapFunction&lt;String, Tuple2&amp;lt;String,
>>>> String&gt;>() {
>>>> >         @Override
>>>> >         public Tuple2 map(String incomingMessage) throws Exception {
>>>> >             return mapToTuple(incomingMessage);
>>>> >         }
>>>> >     })
>>>> >     .keyBy(0)
>>>> >     .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
>>>> >     .aggregate(new JsonConcatenator());
>>>> >
>>>> > Logger logger = LoggerFactory.getLogger(StreamJob.class);
>>>> > logger.info("Ran dataStream. Adding sink next")
>>>> >
>>>> > -------------
>>>> >
>>>> > private static class JsonConcatenator
>>>> >         implements AggregateFunction&lt;Tuple2&amp;lt;String,
>>>> String&gt;,
>>>> > Tuple2&lt;String, String&gt;, String> {
>>>> >     Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
>>>> >     @Override
>>>> >     public Tuple2&lt;String, String&gt; createAccumulator() {
>>>> >         return new Tuple2&lt;String, String&gt;("","");
>>>> >     }
>>>> >
>>>> >     @Override
>>>> >     public Tuple2&lt;String, String&gt; add(Tuple2&lt;String,
>>>> String&gt;
>>>> > value,
>>>> > Tuple2&lt;String, String&gt; accumulator) {
>>>> >         logger.info("Running Add on value.f0: " + value.f0 + " and
>>>> > value.f1:
>>>> > " + value.f1);
>>>> >         return new Tuple2<>(value.f0, accumulator.f1 + ", " +
>>>> value.f1);
>>>> >     }
>>>> >
>>>> >     @Override
>>>> >     public String getResult(Tuple2&lt;String, String&gt; accumulator)
>>>> {
>>>> >         logger.info("Running getResult on accumulator.f1: " +
>>>> > accumulator.f1);
>>>> >         return "[" + accumulator.f1 + "]";
>>>> >     }
>>>> >
>>>> >     @Override
>>>> >     public Tuple2&lt;String, String&gt; merge(Tuple2&lt;String,
>>>> String&gt;
>>>> > a,
>>>> > Tuple2&lt;String, String&gt; b) {
>>>> >         logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1:
>>>> " +
>>>> > a.f1
>>>> > + " and b.f1: " + b.f1);
>>>> >         return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
>>>> >     }
>>>> > }
>>>> >
>>>> >
>>>> >
>>>> >
>>>> > Any ideas?
>>>> >
>>>> >
>>>> > Chris Miller-2 wrote
>>>> >> I hit the same problem, as far as I can tell it should be fixed in
>>>> >> Pulsar 2.4.2. The release of this has already passed voting so I
>>>> hope it
>>>> >> should be available in a day or two.
>>>> >>
>>>> >> https://github.com/apache/pulsar/pull/5068
>>>> >
>>>> >
>>>> >
>>>> >
>>>> >
>>>> > --
>>>> > Sent from:
>>>> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>
>>>

Reply via email to