Hi Hequn,

I now realize that in Production, data will not be a problem since this
will be a high volume kafka topic.
So, I will go with EventTime.

Still, I would like to know if

I can use both TimeCharacteristic.ProcessingTime  and
TimeCharacteristic.EventTime in an application.

*Thanks, the link you provided saved my time.*

*-shyla*





On Sun, Aug 5, 2018 at 9:28 AM, anna stax <annasta...@gmail.com> wrote:

> Hi Hequn,
>
> Thanks for link. Looks like I better use ProcessingTime instead of
> EventTime especially because of the 4th reason you listed..
> "Data should cover a longer time span than the window size to advance the
> event time."
> I need the trigger when the data stops.
>
> I have 1 more question.
>
> Can I set the TimeCharacteristic to the stream level instead on the 
> application level?
> Can I use both TimeCharacteristic.ProcessingTime  and 
> TimeCharacteristic.EventTime in an application.
>
> Thank you
>
> On Sat, Aug 4, 2018 at 10:05 PM, Hequn Cheng <chenghe...@gmail.com> wrote:
>
>> Hi shyla,
>>
>> I answered a similar question on stackoverflow[1], you can take a look
>> first.
>>
>> Best, Hequn
>>
>> [1] https://stackoverflow.com/questions/51691269/event-time-
>> window-in-flink-does-not-trigger
>>
>> On Sun, Aug 5, 2018 at 11:24 AM, shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I used PopularPlacesFromKafka from dataartisans.flinktraining.exercises as 
>>> the basis. I made very minor changes
>>>
>>> and the session window is not triggered. If I use ProcessingTime instead of 
>>> EventTime it works. Here is my code.
>>>
>>> Appreciate any help. Thanks
>>>
>>> object KafkaEventTimeWindow {
>>>
>>>   private val LOCAL_ZOOKEEPER_HOST = "localhost:2181"
>>>   private val LOCAL_KAFKA_BROKER = "localhost:9092"
>>>   private val CON_GROUP = "KafkaEventTimeSessionWindow"
>>>   private val MAX_EVENT_DELAY = 60 // events are out of order by max 60 
>>> seconds
>>>
>>>   def main(args: Array[String]) {
>>>
>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>
>>>     val kafkaProps = new Properties
>>>     kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST)
>>>     kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER)
>>>     kafkaProps.setProperty("group.id", CON_GROUP)
>>>     kafkaProps.setProperty("auto.offset.reset", "earliest")
>>>
>>>     val consumer = new FlinkKafkaConsumer011[PositionEventProto](
>>>       "positionevent",
>>>       new PositionEventProtoSchema,
>>>       kafkaProps)
>>>     consumer.assignTimestampsAndWatermarks(new PositionEventProtoTSAssigner)
>>>
>>>     val posstream = env.addSource(consumer)
>>>
>>>     def convtoepochmilli(cdt: String): Long = {
>>>       val  odt:OffsetDateTime = OffsetDateTime.parse(cdt);
>>>       val i:Instant  = odt.toInstant();
>>>       val millis:Long = i.toEpochMilli();
>>>       millis
>>>     }
>>>
>>>     val outputstream = posstream
>>>       .mapWith{case(p) => (p.getConsumerUserId, 
>>> convtoepochmilli(p.getCreateDateTime.getInIso8601Format))}
>>>       .keyBy(0)
>>>       .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
>>>       .reduce { (v1, v2) => (v1._1, Math.max(v1._2 , v2._2)) }
>>>
>>>     outputstream.print()
>>>
>>>     // execute the transformation pipeline
>>>     env.execute("Output Stream")
>>>   }
>>>
>>> }
>>>
>>> class PositionEventProtoTSAssigner
>>>   extends 
>>> BoundedOutOfOrdernessTimestampExtractor[PositionEventProto](Time.seconds(60))
>>>  {
>>>
>>>   override def extractTimestamp(pos: PositionEventProto): Long = {
>>>     val  odt:OffsetDateTime = 
>>> OffsetDateTime.parse(pos.getCreateDateTime.getInIso8601Format);
>>>     val i:Instant  = odt.toInstant();
>>>     val millis:Long = i.toEpochMilli();
>>>     millis
>>>   }
>>> }
>>>
>>>
>>>
>>
>

Reply via email to