Hi Dario, I don't care about event time I just want to do tumbling window
over the "processing time" I.e: count whatever I have in the last 5 minutes.

On Mon, 31 Jan 2022 at 17:09, Dario Heinisch <dario.heini...@gmail.com>
wrote:

> Hi John
>
> This is because you are using event time (TumblingEventTimeWinodws) but
> you do not have a event time watermark strategy.
> It is also why I opened: https://issues.apache.org/jira/browse/FLINK-24623
> because I feel like Flink should be throwing an exception in that case
> on startup.
>
> Take a look at the documentation at:
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/
> which should have everything.
>
> > In order to work with event time, Flink needs to know the events
> timestamps, meaning each element in the stream needs to have its event
> timestamp assigned. This is usually done by accessing/extracting the
> timestamp from > some field in the element by using a TimestampAssigner.
> > Timestamp assignment goes hand-in-hand with generating watermarks, which
> tell the system about progress in event time. You can configure this by
> specifying a WatermarkGenerator.
>
> Best regards,
>
> Dario
> On 31.01.22 22:28, John Smith wrote:
>
> Hi I have the following job... I'm expecting the System.out
> .println(key.toString());   to at least print, but nothing prints.
>
> - .flatMap: Fires prints my debug message once as expected.
> - .keyBy: Also fires, but prints my debug message twice.
> - .apply: Doesn't seem to fire. The debug statement doesn't seem to print.
> I'm expecting it to print the key from above keyBy.
>
> DataStream<MyEvent> slStream = env.fromSource(kafkaSource, 
> WatermarkStrategy.noWatermarks(), "Kafka Source")
>         .uid(kafkaTopic).name(kafkaTopic)
>         .setParallelism(1)
>         .flatMap(new MapToMyEvent("my-event", "message")) // <--- This works
>         .uid("map-json-logs").name("map-json-logs");        
> slStream.keyBy(new MinutesKeySelector(windowSizeMins)) // <--- This prints 
> twice
>         .window(TumblingEventTimeWindows.of(Time.minutes(windowSizeMins)))
>                 .apply(new WindowFunction<MyEvent, MyEvent, Tuple3<String, 
> String, String>, TimeWindow>() {
>             @Override            public void apply(Tuple3<String, String, 
> String> key, TimeWindow window, Iterable<MyEvent> input, Collector<MyEvent> 
> out) throws Exception {
>                 // This should print.                
> System.out.println(key.toString());                // Do nothing for now      
>       }
>         })
>         .uid("process").name("process")
>         ;
>
>
>
>

Reply via email to