Hi shyla,

I answered a similar question on stackoverflow[1], you can take a look
first.

Best, Hequn

[1]
https://stackoverflow.com/questions/51691269/event-time-window-in-flink-does-not-trigger

On Sun, Aug 5, 2018 at 11:24 AM, shyla deshpande <deshpandesh...@gmail.com>
wrote:

> Hi,
>
> I used PopularPlacesFromKafka from dataartisans.flinktraining.exercises as 
> the basis. I made very minor changes
>
> and the session window is not triggered. If I use ProcessingTime instead of 
> EventTime it works. Here is my code.
>
> Appreciate any help. Thanks
>
> object KafkaEventTimeWindow {
>
>   private val LOCAL_ZOOKEEPER_HOST = "localhost:2181"
>   private val LOCAL_KAFKA_BROKER = "localhost:9092"
>   private val CON_GROUP = "KafkaEventTimeSessionWindow"
>   private val MAX_EVENT_DELAY = 60 // events are out of order by max 60 
> seconds
>
>   def main(args: Array[String]) {
>
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
>     val kafkaProps = new Properties
>     kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST)
>     kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER)
>     kafkaProps.setProperty("group.id", CON_GROUP)
>     kafkaProps.setProperty("auto.offset.reset", "earliest")
>
>     val consumer = new FlinkKafkaConsumer011[PositionEventProto](
>       "positionevent",
>       new PositionEventProtoSchema,
>       kafkaProps)
>     consumer.assignTimestampsAndWatermarks(new PositionEventProtoTSAssigner)
>
>     val posstream = env.addSource(consumer)
>
>     def convtoepochmilli(cdt: String): Long = {
>       val  odt:OffsetDateTime = OffsetDateTime.parse(cdt);
>       val i:Instant  = odt.toInstant();
>       val millis:Long = i.toEpochMilli();
>       millis
>     }
>
>     val outputstream = posstream
>       .mapWith{case(p) => (p.getConsumerUserId, 
> convtoepochmilli(p.getCreateDateTime.getInIso8601Format))}
>       .keyBy(0)
>       .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
>       .reduce { (v1, v2) => (v1._1, Math.max(v1._2 , v2._2)) }
>
>     outputstream.print()
>
>     // execute the transformation pipeline
>     env.execute("Output Stream")
>   }
>
> }
>
> class PositionEventProtoTSAssigner
>   extends 
> BoundedOutOfOrdernessTimestampExtractor[PositionEventProto](Time.seconds(60)) 
> {
>
>   override def extractTimestamp(pos: PositionEventProto): Long = {
>     val  odt:OffsetDateTime = 
> OffsetDateTime.parse(pos.getCreateDateTime.getInIso8601Format);
>     val i:Instant  = odt.toInstant();
>     val millis:Long = i.toEpochMilli();
>     millis
>   }
> }
>
>
>

Reply via email to