Hi Hequn and Fabian,
Thanks. Appreciate your help

On Mon, Aug 6, 2018 at 1:32 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi,
>
> By setting the time characteristic to EventTime, you enable the internal
> handling of record timestamps and watermarks.
> In contrast to EventTime, ProcessingTime does not require any additional
> data.
>
> You can use both, EventTime and ProcessingTime in the same application and
> StreamExecutionEnvironment.
> However, if you enable EventTime, this will be the default mode in some
> API methods that create time-based operators and you will need to
> explicitly create ProcessingTime operators if you want to work in
> ProcessingTime.
> For example, the stream.keyBy().timeWindow(Time.minute(1)) shortcut,
> would create an EventTime Tumbling Window if the TimeCharacteristic is set
> to EventTime and a ProcessingTIme Tumbling Window if it is ProcessingTIme.
>
> Best,
> Fabian
>
> 2018-08-06 4:30 GMT+02:00 Hequn Cheng <chenghe...@gmail.com>:
>
>> Hi anna, shyla
>>
>> When we call setStreamTimeCharacteristic(env.setStreamTimeCharacteristic),
>> it means sets the time characteristic for all streams create from this
>> environment. So if your application contains multi environments, then yes.
>>
>> Best, Hequn
>>
>> On Mon, Aug 6, 2018 at 9:37 AM, shyla deshpande <deshpandesh...@gmail.com
>> > wrote:
>>
>>> Hi Hequn,
>>>
>>> I now realize that in Production, data will not be a problem since this
>>> will be a high volume kafka topic.
>>> So, I will go with EventTime.
>>>
>>> Still, I would like to know if
>>>
>>> I can use both TimeCharacteristic.ProcessingTime  and 
>>> TimeCharacteristic.EventTime in an application.
>>>
>>> *Thanks, the link you provided saved my time.*
>>>
>>> *-shyla*
>>>
>>>
>>>
>>>
>>>
>>> On Sun, Aug 5, 2018 at 9:28 AM, anna stax <annasta...@gmail.com> wrote:
>>>
>>>> Hi Hequn,
>>>>
>>>> Thanks for link. Looks like I better use ProcessingTime instead of
>>>> EventTime especially because of the 4th reason you listed..
>>>> "Data should cover a longer time span than the window size to advance
>>>> the event time."
>>>> I need the trigger when the data stops.
>>>>
>>>> I have 1 more question.
>>>>
>>>> Can I set the TimeCharacteristic to the stream level instead on the 
>>>> application level?
>>>> Can I use both TimeCharacteristic.ProcessingTime  and 
>>>> TimeCharacteristic.EventTime in an application.
>>>>
>>>> Thank you
>>>>
>>>> On Sat, Aug 4, 2018 at 10:05 PM, Hequn Cheng <chenghe...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi shyla,
>>>>>
>>>>> I answered a similar question on stackoverflow[1], you can take a look
>>>>> first.
>>>>>
>>>>> Best, Hequn
>>>>>
>>>>> [1] https://stackoverflow.com/questions/51691269/event-time-
>>>>> window-in-flink-does-not-trigger
>>>>>
>>>>> On Sun, Aug 5, 2018 at 11:24 AM, shyla deshpande <
>>>>> deshpandesh...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I used PopularPlacesFromKafka from dataartisans.flinktraining.exercises 
>>>>>> as the basis. I made very minor changes
>>>>>>
>>>>>> and the session window is not triggered. If I use ProcessingTime instead 
>>>>>> of EventTime it works. Here is my code.
>>>>>>
>>>>>> Appreciate any help. Thanks
>>>>>>
>>>>>> object KafkaEventTimeWindow {
>>>>>>
>>>>>>   private val LOCAL_ZOOKEEPER_HOST = "localhost:2181"
>>>>>>   private val LOCAL_KAFKA_BROKER = "localhost:9092"
>>>>>>   private val CON_GROUP = "KafkaEventTimeSessionWindow"
>>>>>>   private val MAX_EVENT_DELAY = 60 // events are out of order by max 60 
>>>>>> seconds
>>>>>>
>>>>>>   def main(args: Array[String]) {
>>>>>>
>>>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>>>
>>>>>>     val kafkaProps = new Properties
>>>>>>     kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST)
>>>>>>     kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER)
>>>>>>     kafkaProps.setProperty("group.id", CON_GROUP)
>>>>>>     kafkaProps.setProperty("auto.offset.reset", "earliest")
>>>>>>
>>>>>>     val consumer = new FlinkKafkaConsumer011[PositionEventProto](
>>>>>>       "positionevent",
>>>>>>       new PositionEventProtoSchema,
>>>>>>       kafkaProps)
>>>>>>     consumer.assignTimestampsAndWatermarks(new 
>>>>>> PositionEventProtoTSAssigner)
>>>>>>
>>>>>>     val posstream = env.addSource(consumer)
>>>>>>
>>>>>>     def convtoepochmilli(cdt: String): Long = {
>>>>>>       val  odt:OffsetDateTime = OffsetDateTime.parse(cdt);
>>>>>>       val i:Instant  = odt.toInstant();
>>>>>>       val millis:Long = i.toEpochMilli();
>>>>>>       millis
>>>>>>     }
>>>>>>
>>>>>>     val outputstream = posstream
>>>>>>       .mapWith{case(p) => (p.getConsumerUserId, 
>>>>>> convtoepochmilli(p.getCreateDateTime.getInIso8601Format))}
>>>>>>       .keyBy(0)
>>>>>>       .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
>>>>>>       .reduce { (v1, v2) => (v1._1, Math.max(v1._2 , v2._2)) }
>>>>>>
>>>>>>     outputstream.print()
>>>>>>
>>>>>>     // execute the transformation pipeline
>>>>>>     env.execute("Output Stream")
>>>>>>   }
>>>>>>
>>>>>> }
>>>>>>
>>>>>> class PositionEventProtoTSAssigner
>>>>>>   extends 
>>>>>> BoundedOutOfOrdernessTimestampExtractor[PositionEventProto](Time.seconds(60))
>>>>>>  {
>>>>>>
>>>>>>   override def extractTimestamp(pos: PositionEventProto): Long = {
>>>>>>     val  odt:OffsetDateTime = 
>>>>>> OffsetDateTime.parse(pos.getCreateDateTime.getInIso8601Format);
>>>>>>     val i:Instant  = odt.toInstant();
>>>>>>     val millis:Long = i.toEpochMilli();
>>>>>>     millis
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to