Hi Hequn,

Thanks for link. Looks like I better use ProcessingTime instead of
EventTime especially because of the 4th reason you listed..
"Data should cover a longer time span than the window size to advance the
event time."
I need the trigger when the data stops.

I have 1 more question.

Can I set the TimeCharacteristic to the stream level instead on the
application level?
Can I use both TimeCharacteristic.ProcessingTime  and
TimeCharacteristic.EventTime in an application.

Thank you

On Sat, Aug 4, 2018 at 10:05 PM, Hequn Cheng <chenghe...@gmail.com> wrote:

> Hi shyla,
> I answered a similar question on stackoverflow[1], you can take a look
> first.
> Best, Hequn
> [1] https://stackoverflow.com/questions/51691269/event-time-
> window-in-flink-does-not-trigger
> On Sun, Aug 5, 2018 at 11:24 AM, shyla deshpande <deshpandesh...@gmail.com
> > wrote:
>> Hi,
>> I used PopularPlacesFromKafka from dataartisans.flinktraining.exercises as 
>> the basis. I made very minor changes
>> and the session window is not triggered. If I use ProcessingTime instead of 
>> EventTime it works. Here is my code.
>> Appreciate any help. Thanks
>> object KafkaEventTimeWindow {
>>   private val LOCAL_ZOOKEEPER_HOST = "localhost:2181"
>>   private val LOCAL_KAFKA_BROKER = "localhost:9092"
>>   private val CON_GROUP = "KafkaEventTimeSessionWindow"
>>   private val MAX_EVENT_DELAY = 60 // events are out of order by max 60 
>> seconds
>>   def main(args: Array[String]) {
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>     val kafkaProps = new Properties
>>     kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST)
>>     kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER)
>>     kafkaProps.setProperty("group.id", CON_GROUP)
>>     kafkaProps.setProperty("auto.offset.reset", "earliest")
>>     val consumer = new FlinkKafkaConsumer011[PositionEventProto](
>>       "positionevent",
>>       new PositionEventProtoSchema,
>>       kafkaProps)
>>     consumer.assignTimestampsAndWatermarks(new PositionEventProtoTSAssigner)
>>     val posstream = env.addSource(consumer)
>>     def convtoepochmilli(cdt: String): Long = {
>>       val  odt:OffsetDateTime = OffsetDateTime.parse(cdt);
>>       val i:Instant  = odt.toInstant();
>>       val millis:Long = i.toEpochMilli();
>>       millis
>>     }
>>     val outputstream = posstream
>>       .mapWith{case(p) => (p.getConsumerUserId, 
>> convtoepochmilli(p.getCreateDateTime.getInIso8601Format))}
>>       .keyBy(0)
>>       .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
>>       .reduce { (v1, v2) => (v1._1, Math.max(v1._2 , v2._2)) }
>>     outputstream.print()
>>     // execute the transformation pipeline
>>     env.execute("Output Stream")
>>   }
>> }
>> class PositionEventProtoTSAssigner
>>   extends 
>> BoundedOutOfOrdernessTimestampExtractor[PositionEventProto](Time.seconds(60))
>>  {
>>   override def extractTimestamp(pos: PositionEventProto): Long = {
>>     val  odt:OffsetDateTime = 
>> OffsetDateTime.parse(pos.getCreateDateTime.getInIso8601Format);
>>     val i:Instant  = odt.toInstant();
>>     val millis:Long = i.toEpochMilli();
>>     millis
>>   }
>> }

