Hi Hequn and Fabian, Thanks. Appreciate your help On Mon, Aug 6, 2018 at 1:32 AM, Fabian Hueske <fhue...@gmail.com> wrote:
> Hi, > > By setting the time characteristic to EventTime, you enable the internal > handling of record timestamps and watermarks. > In contrast to EventTime, ProcessingTime does not require any additional > data. > > You can use both, EventTime and ProcessingTime in the same application and > StreamExecutionEnvironment. > However, if you enable EventTime, this will be the default mode in some > API methods that create time-based operators and you will need to > explicitly create ProcessingTime operators if you want to work in > ProcessingTime. > For example, the stream.keyBy().timeWindow(Time.minute(1)) shortcut, > would create an EventTime Tumbling Window if the TimeCharacteristic is set > to EventTime and a ProcessingTIme Tumbling Window if it is ProcessingTIme. > > Best, > Fabian > > 2018-08-06 4:30 GMT+02:00 Hequn Cheng <chenghe...@gmail.com>: > >> Hi anna, shyla >> >> When we call setStreamTimeCharacteristic(env.setStreamTimeCharacteristic), >> it means sets the time characteristic for all streams create from this >> environment. So if your application contains multi environments, then yes. >> >> Best, Hequn >> >> On Mon, Aug 6, 2018 at 9:37 AM, shyla deshpande <deshpandesh...@gmail.com >> > wrote: >> >>> Hi Hequn, >>> >>> I now realize that in Production, data will not be a problem since this >>> will be a high volume kafka topic. >>> So, I will go with EventTime. >>> >>> Still, I would like to know if >>> >>> I can use both TimeCharacteristic.ProcessingTime and >>> TimeCharacteristic.EventTime in an application. >>> >>> *Thanks, the link you provided saved my time.* >>> >>> *-shyla* >>> >>> >>> >>> >>> >>> On Sun, Aug 5, 2018 at 9:28 AM, anna stax <annasta...@gmail.com> wrote: >>> >>>> Hi Hequn, >>>> >>>> Thanks for link. Looks like I better use ProcessingTime instead of >>>> EventTime especially because of the 4th reason you listed.. >>>> "Data should cover a longer time span than the window size to advance >>>> the event time." >>>> I need the trigger when the data stops. >>>> >>>> I have 1 more question. >>>> >>>> Can I set the TimeCharacteristic to the stream level instead on the >>>> application level? >>>> Can I use both TimeCharacteristic.ProcessingTime and >>>> TimeCharacteristic.EventTime in an application. >>>> >>>> Thank you >>>> >>>> On Sat, Aug 4, 2018 at 10:05 PM, Hequn Cheng <chenghe...@gmail.com> >>>> wrote: >>>> >>>>> Hi shyla, >>>>> >>>>> I answered a similar question on stackoverflow[1], you can take a look >>>>> first. >>>>> >>>>> Best, Hequn >>>>> >>>>> [1] https://stackoverflow.com/questions/51691269/event-time- >>>>> window-in-flink-does-not-trigger >>>>> >>>>> On Sun, Aug 5, 2018 at 11:24 AM, shyla deshpande < >>>>> deshpandesh...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I used PopularPlacesFromKafka from dataartisans.flinktraining.exercises >>>>>> as the basis. I made very minor changes >>>>>> >>>>>> and the session window is not triggered. If I use ProcessingTime instead >>>>>> of EventTime it works. Here is my code. >>>>>> >>>>>> Appreciate any help. Thanks >>>>>> >>>>>> object KafkaEventTimeWindow { >>>>>> >>>>>> private val LOCAL_ZOOKEEPER_HOST = "localhost:2181" >>>>>> private val LOCAL_KAFKA_BROKER = "localhost:9092" >>>>>> private val CON_GROUP = "KafkaEventTimeSessionWindow" >>>>>> private val MAX_EVENT_DELAY = 60 // events are out of order by max 60 >>>>>> seconds >>>>>> >>>>>> def main(args: Array[String]) { >>>>>> >>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >>>>>> >>>>>> val kafkaProps = new Properties >>>>>> kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST) >>>>>> kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER) >>>>>> kafkaProps.setProperty("group.id", CON_GROUP) >>>>>> kafkaProps.setProperty("auto.offset.reset", "earliest") >>>>>> >>>>>> val consumer = new FlinkKafkaConsumer011[PositionEventProto]( >>>>>> "positionevent", >>>>>> new PositionEventProtoSchema, >>>>>> kafkaProps) >>>>>> consumer.assignTimestampsAndWatermarks(new >>>>>> PositionEventProtoTSAssigner) >>>>>> >>>>>> val posstream = env.addSource(consumer) >>>>>> >>>>>> def convtoepochmilli(cdt: String): Long = { >>>>>> val odt:OffsetDateTime = OffsetDateTime.parse(cdt); >>>>>> val i:Instant = odt.toInstant(); >>>>>> val millis:Long = i.toEpochMilli(); >>>>>> millis >>>>>> } >>>>>> >>>>>> val outputstream = posstream >>>>>> .mapWith{case(p) => (p.getConsumerUserId, >>>>>> convtoepochmilli(p.getCreateDateTime.getInIso8601Format))} >>>>>> .keyBy(0) >>>>>> .window(EventTimeSessionWindows.withGap(Time.seconds(60))) >>>>>> .reduce { (v1, v2) => (v1._1, Math.max(v1._2 , v2._2)) } >>>>>> >>>>>> outputstream.print() >>>>>> >>>>>> // execute the transformation pipeline >>>>>> env.execute("Output Stream") >>>>>> } >>>>>> >>>>>> } >>>>>> >>>>>> class PositionEventProtoTSAssigner >>>>>> extends >>>>>> BoundedOutOfOrdernessTimestampExtractor[PositionEventProto](Time.seconds(60)) >>>>>> { >>>>>> >>>>>> override def extractTimestamp(pos: PositionEventProto): Long = { >>>>>> val odt:OffsetDateTime = >>>>>> OffsetDateTime.parse(pos.getCreateDateTime.getInIso8601Format); >>>>>> val i:Instant = odt.toInstant(); >>>>>> val millis:Long = i.toEpochMilli(); >>>>>> millis >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >