Hi Soheil,

Hequn is right. This might be an issue with advancing event-time.
You can monitor that by checking the watermarks in the web dashboard or
print-debug it with a ProcessFunction which can lookup the current
watermark.

Best, Fabian

2018-07-19 3:30 GMT+02:00 Hequn Cheng <chenghe...@gmail.com>:

> Hi Soheil,
>
> > wait 8 milliseconds (according to my code) to see if any other data
> with the same key received or not and after 8 millisecond it will be
> triggered.
> Yes, but the time is event time, so if there is no data from source the
> time won't be advanced.
>
> There are some reasons why the event time has not been advanced:
> 1. There are no data from the source
> 2. One of the source parallelisms doesn't have data
> 3. The time field, i.e, Long in Tuple3, should be millisecond instead of
> second.
> 4. Data should cover a longer time spam than the window size to advance
> the event time.
>
> Best, Hequn
>
> On Wed, Jul 18, 2018 at 3:53 PM, Soheil Pourbafrani <soheil.i...@gmail.com
> > wrote:
>
>> Hi,
>>
>> In a datastream processing problem, the source generated data every 8
>> millisecond and timestamp is a field of the data. In default Flink time
>> behavior data enter the time window but when I set Flink time to EventTime
>> it will output nothing! Here is the code:
>>
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>> SingleOutputStreamOperator<Tuple3<String,Long, JSONObject>> res = 
>> aggregatedTuple
>>                 .assignTimestampsAndWatermarks(new 
>> BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, 
>> JSONObject>>(Time.milliseconds(8)) {
>>
>>             @Override
>>             public long extractTimestamp(Tuple3<String, Long, JSONObject> 
>> element) {
>>                 return element.f1 ;
>>             }
>>         }).keyBy(1).timeWindow(Time.milliseconds(8))
>>                 .allowedLateness(Time.milliseconds(3))
>>                 .sideOutputLateData(lateOutputTag)
>>                 .reduce(processing...);
>>         DataStream<Tuple3<String, Long, JSONObject>> lateData = 
>> res.getSideOutput(lateOutputTag);
>>         res.print();
>>
>> What is the problem with my code?
>> According to the Flink documents, my understanding about EventTime is
>> that for example in case of time window when a new data received it start a
>> new (logical window) based on new data event timestamp and wait 8
>> milliseconds (according to my code) to see if any other data with the same
>> key received or not and after 8 millisecond (from timestamp of the first
>> element of the window) it will be triggered. Since data source generated
>> data in a constant periodic interval, I set a watermarck of  8, too. Is my
>> understanding about Flink window in EventTime correct?
>>
>
>

Reply via email to