Hi Hequn, Thanks for link. Looks like I better use ProcessingTime instead of EventTime especially because of the 4th reason you listed.. "Data should cover a longer time span than the window size to advance the event time." I need the trigger when the data stops.
I have 1 more question. Can I set the TimeCharacteristic to the stream level instead on the application level? Can I use both TimeCharacteristic.ProcessingTime and TimeCharacteristic.EventTime in an application. Thank you On Sat, Aug 4, 2018 at 10:05 PM, Hequn Cheng <chenghe...@gmail.com> wrote: > Hi shyla, > > I answered a similar question on stackoverflow[1], you can take a look > first. > > Best, Hequn > > [1] https://stackoverflow.com/questions/51691269/event-time- > window-in-flink-does-not-trigger > > On Sun, Aug 5, 2018 at 11:24 AM, shyla deshpande <deshpandesh...@gmail.com > > wrote: > >> Hi, >> >> I used PopularPlacesFromKafka from dataartisans.flinktraining.exercises as >> the basis. I made very minor changes >> >> and the session window is not triggered. If I use ProcessingTime instead of >> EventTime it works. Here is my code. >> >> Appreciate any help. Thanks >> >> object KafkaEventTimeWindow { >> >> private val LOCAL_ZOOKEEPER_HOST = "localhost:2181" >> private val LOCAL_KAFKA_BROKER = "localhost:9092" >> private val CON_GROUP = "KafkaEventTimeSessionWindow" >> private val MAX_EVENT_DELAY = 60 // events are out of order by max 60 >> seconds >> >> def main(args: Array[String]) { >> >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> >> val kafkaProps = new Properties >> kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST) >> kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER) >> kafkaProps.setProperty("group.id", CON_GROUP) >> kafkaProps.setProperty("auto.offset.reset", "earliest") >> >> val consumer = new FlinkKafkaConsumer011[PositionEventProto]( >> "positionevent", >> new PositionEventProtoSchema, >> kafkaProps) >> consumer.assignTimestampsAndWatermarks(new PositionEventProtoTSAssigner) >> >> val posstream = env.addSource(consumer) >> >> def convtoepochmilli(cdt: String): Long = { >> val odt:OffsetDateTime = OffsetDateTime.parse(cdt); >> val i:Instant = odt.toInstant(); >> val millis:Long = i.toEpochMilli(); >> millis >> } >> >> val outputstream = posstream >> .mapWith{case(p) => (p.getConsumerUserId, >> convtoepochmilli(p.getCreateDateTime.getInIso8601Format))} >> .keyBy(0) >> .window(EventTimeSessionWindows.withGap(Time.seconds(60))) >> .reduce { (v1, v2) => (v1._1, Math.max(v1._2 , v2._2)) } >> >> outputstream.print() >> >> // execute the transformation pipeline >> env.execute("Output Stream") >> } >> >> } >> >> class PositionEventProtoTSAssigner >> extends >> BoundedOutOfOrdernessTimestampExtractor[PositionEventProto](Time.seconds(60)) >> { >> >> override def extractTimestamp(pos: PositionEventProto): Long = { >> val odt:OffsetDateTime = >> OffsetDateTime.parse(pos.getCreateDateTime.getInIso8601Format); >> val i:Instant = odt.toInstant(); >> val millis:Long = i.toEpochMilli(); >> millis >> } >> } >> >> >> >