Re: Flink eventTIme问题

2020-12-18 Thread Khachatryan Roman
Hi Jiazhi,

Could you share table definitions and both queries?

Regards,
Roman


On Fri, Dec 18, 2020 at 4:39 AM ゞ野蠻遊戲χ  wrote:

> Hi all
>  When I use SQL with UDTF, when I call the tableEnv.sqlQuery ()
> method, I throw the following error: Rowtime attributes must not be in the
> input rows of a regular join. As a workaround you can cast the time
> attributes of input tables to TIMESTAMP before. I used the to_timestamp
> function in eventTIme and it doesn't work, How to solve the problem?
>
> sql: select
>   tmp.metric_id as metric_id,
>   tmp.metric_config as metric_config,
>   startLat,
>   destName,
>   bizType,
>   orderId,
>   completedTime,
>   orderStatus,
>   startHexList,
>   cityId,
>   type,
>   destLat,
>   endHexList,
>   destLng,
>   createTime,
>   passengerId,
>   finishedTime,
>   vehicleId,
>   startLng,
>   startName,
>   eventTime
> from
>   htw_order_dwd_htw_order_geo_Infos,
>   lateral table(
> metricUdtf('aa')
>   ) as tmp(metric_id, metric_config)
>
> Thanks
> Jiazhi
>


Re: Flink eventTIme问题

2020-12-18 Thread Khachatryan Roman
Hi Jiazhi,

Could you share table definitions and both queries?

Regards,
Roman


On Fri, Dec 18, 2020 at 4:39 AM ゞ野蠻遊戲χ  wrote:

> Hi all
>  When I use SQL with UDTF, when I call the tableEnv.sqlQuery ()
> method, I throw the following error: Rowtime attributes must not be in the
> input rows of a regular join. As a workaround you can cast the time
> attributes of input tables to TIMESTAMP before. I used the to_timestamp
> function in eventTIme and it doesn't work, How to solve the problem?
>
> sql: select
>   tmp.metric_id as metric_id,
>   tmp.metric_config as metric_config,
>   startLat,
>   destName,
>   bizType,
>   orderId,
>   completedTime,
>   orderStatus,
>   startHexList,
>   cityId,
>   type,
>   destLat,
>   endHexList,
>   destLng,
>   createTime,
>   passengerId,
>   finishedTime,
>   vehicleId,
>   startLng,
>   startName,
>   eventTime
> from
>   htw_order_dwd_htw_order_geo_Infos,
>   lateral table(
> metricUdtf('aa')
>   ) as tmp(metric_id, metric_config)
>
> Thanks
> Jiazhi
>


Flink eventTIme????

2020-12-17 Thread ?g???U?[????
Hi all  When I use SQL with UDTF, when I call the 
tableEnv.sqlQuery () method, I throw the following error: Rowtime attributes 
must not be in the input rows of a regular join. As a workaround you can cast 
the time attributes of input tables to TIMESTAMP before.I used the 
to_timestamp function in eventTIme and it doesn't work,How to solve the 
problem?


sql: select
 tmp.metric_id as 
metric_id,
 
tmp.metric_config as metric_config,
 startLat,
 destName,
 bizType,
 orderId,
 
completedTime,
 
orderStatus,
 
startHexList,
 cityId,
 type,
 destLat,
 endHexList,
 destLng,
 createTime,
 
passengerId,
 
finishedTime,
 vehicleId,
 startLng,
 startName,
 eventTime
from
 
htw_order_dwd_htw_order_geo_Infos,
 lateral table(
  
metricUdtf('aa')
 ) as 
tmp(metric_id, metric_config)


Thanks
Jiazhi

Flink eventTIme????

2020-12-17 Thread ?g???U?[????
Hi all  When I use SQL with UDTF, when I call the 
tableEnv.sqlQuery () method, I throw the following error: Rowtime attributes 
must not be in the input rows of a regular join. As a workaround you can cast 
the time attributes of input tables to TIMESTAMP before.I used the 
to_timestamp function in eventTIme and it doesn't work,How to solve the 
problem?


sql: select
 tmp.metric_id as 
metric_id,
 
tmp.metric_config as metric_config,
 startLat,
 destName,
 bizType,
 orderId,
 
completedTime,
 
orderStatus,
 
startHexList,
 cityId,
 type,
 destLat,
 endHexList,
 destLng,
 createTime,
 
passengerId,
 
finishedTime,
 vehicleId,
 startLng,
 startName,
 eventTime
from
 
htw_order_dwd_htw_order_geo_Infos,
 lateral table(
  
metricUdtf('aa')
 ) as 
tmp(metric_id, metric_config)


Thanks
Jiazhi

Re: Flink EventTime Processing Watermark is always coming as 9223372036854725808

2020-03-03 Thread aj
Thanks, Robert for mentioning this, I will take care of it in future posts.

I am able to figure out the issue. When I disable checkpoint then the
watermark is getting updated and its working. I need to understand 2 things
:

1. Please help to understand what is happening when I enable checkpointing,
and how to make it work with enable checkpointing as I need to write a data
stream with checkpoint enable.

2. Second, so basically I want to collect all the session data and want to
process all the events data at the end of the session (using inactivity for
x minutes).
I know this functionality is available in the session window where I can
create a session window using an inactive period But there enrichment and
processing of events is not recommended. So, how I can use the same
functionality to trigger based on the inactivity period and process all the
events and clear the queue.


Thanks,
Anuj


On Tue, Mar 3, 2020 at 3:40 AM Robert Metzger  wrote:

> side note: this question has been asked on SO as well:
> https://stackoverflow.com/questions/60487571/flink-eventtime-processing-watermark-is-always-coming-as-9223372036854725808
> (I'm mentioning this here so that we are not wasting support resources in
> our community on double-debugging issues)
>
> On Mon, Mar 2, 2020 at 5:36 PM aj  wrote:
>
>> Hi David,
>>
>> Currently, I am testing it with a single source and parallelism 1 only so
>> not able to understand this behavior.
>>
>> On Mon, Mar 2, 2020 at 9:02 PM Dawid Wysakowicz 
>> wrote:
>>
>>> Hi Anuj,
>>>
>>> What parallelism has your source? Do all of your source tasks produce
>>> records? Watermark is always the minimum of timestamps seen from all the
>>> upstream operators. Therefore if some of them do not produce records the
>>> watermark will not progress. You can read more about Watermarks and how
>>> they work here:
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#watermarks-in-parallel-streams
>>>
>>> Hope that helps
>>>
>>> Best,
>>>
>>> Dawid
>>> On 02/03/2020 16:26, aj wrote:
>>>
>>> I am trying to use process function to some processing on a set of
>>> events. I am using event time and keystream. The issue I am facing is The
>>> watermark value is always coming as 9223372036854725808. I have put print
>>> statement to debug and it shows like this:
>>>
>>> timestamp--1583128014000 extractedTimestamp 1583128014000
>>> currentwatermark-9223372036854775808
>>>
>>> timestamp--1583128048000 extractedTimestamp 1583128048000
>>> currentwatermark-9223372036854775808
>>>
>>> timestamp--1583128089000 extractedTimestamp 1583128089000
>>> currentwatermark-9223372036854775808
>>>
>>> timestamp and extracted timestamp changing but watermark not getting
>>> updated. So no record is getting in the queue as context.timestamp is never
>>> less than the watermark.
>>>
>>>
>>> DataStream dataStream = 
>>> env.addSource(searchConsumer).name("search_list_keyless");
>>> DataStream dataStreamWithWaterMark =  
>>> dataStream.assignTimestampsAndWatermarks(new SessionAssigner());
>>>
>>>try {
>>> dataStreamWithWaterMark.keyBy((KeySelector>> String>) record -> {
>>> StringBuilder builder = new StringBuilder();
>>> builder.append(record.get("session_id"));
>>> builder.append(record.get("user_id"));
>>> return builder.toString();
>>> }).process(new MatchFunction()).print();
>>> }
>>> catch (Exception e){
>>> e.printStackTrace();
>>> }
>>> env.execute("start session process");
>>>
>>> }
>>>
>>> public static class SessionAssigner implements 
>>> AssignerWithPunctuatedWatermarks  {
>>> @Override
>>> public long extractTimestamp(GenericRecord record, long 
>>> previousElementTimestamp) {
>>> long timestamp = (long) record.get("event_ts");
>>> System.out.println("timestamp--"+ timestamp);
>>> return timestamp;
>>> }
>>>
>>> @Override
>>> public Watermark checkAndGetNextWatermark(GenericRecord record, 
>>> long extractedTimestamp) {
>>> // simply emit a watermark with every event

Re: Flink EventTime Processing Watermark is always coming as 9223372036854725808

2020-03-02 Thread Robert Metzger
side note: this question has been asked on SO as well:
https://stackoverflow.com/questions/60487571/flink-eventtime-processing-watermark-is-always-coming-as-9223372036854725808
(I'm mentioning this here so that we are not wasting support resources in
our community on double-debugging issues)

On Mon, Mar 2, 2020 at 5:36 PM aj  wrote:

> Hi David,
>
> Currently, I am testing it with a single source and parallelism 1 only so
> not able to understand this behavior.
>
> On Mon, Mar 2, 2020 at 9:02 PM Dawid Wysakowicz 
> wrote:
>
>> Hi Anuj,
>>
>> What parallelism has your source? Do all of your source tasks produce
>> records? Watermark is always the minimum of timestamps seen from all the
>> upstream operators. Therefore if some of them do not produce records the
>> watermark will not progress. You can read more about Watermarks and how
>> they work here:
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#watermarks-in-parallel-streams
>>
>> Hope that helps
>>
>> Best,
>>
>> Dawid
>> On 02/03/2020 16:26, aj wrote:
>>
>> I am trying to use process function to some processing on a set of
>> events. I am using event time and keystream. The issue I am facing is The
>> watermark value is always coming as 9223372036854725808. I have put print
>> statement to debug and it shows like this:
>>
>> timestamp--1583128014000 extractedTimestamp 1583128014000
>> currentwatermark-9223372036854775808
>>
>> timestamp--1583128048000 extractedTimestamp 1583128048000
>> currentwatermark-9223372036854775808
>>
>> timestamp--1583128089000 extractedTimestamp 1583128089000
>> currentwatermark-9223372036854775808
>>
>> timestamp and extracted timestamp changing but watermark not getting
>> updated. So no record is getting in the queue as context.timestamp is never
>> less than the watermark.
>>
>>
>> DataStream dataStream = 
>> env.addSource(searchConsumer).name("search_list_keyless");
>> DataStream dataStreamWithWaterMark =  
>> dataStream.assignTimestampsAndWatermarks(new SessionAssigner());
>>
>>try {
>> dataStreamWithWaterMark.keyBy((KeySelector> String>) record -> {
>> StringBuilder builder = new StringBuilder();
>> builder.append(record.get("session_id"));
>> builder.append(record.get("user_id"));
>> return builder.toString();
>> }).process(new MatchFunction()).print();
>> }
>> catch (Exception e){
>> e.printStackTrace();
>> }
>> env.execute("start session process");
>>
>> }
>>
>> public static class SessionAssigner implements 
>> AssignerWithPunctuatedWatermarks  {
>> @Override
>> public long extractTimestamp(GenericRecord record, long 
>> previousElementTimestamp) {
>> long timestamp = (long) record.get("event_ts");
>> System.out.println("timestamp--"+ timestamp);
>> return timestamp;
>> }
>>
>> @Override
>> public Watermark checkAndGetNextWatermark(GenericRecord record, long 
>> extractedTimestamp) {
>> // simply emit a watermark with every event
>> System.out.println("extractedTimestamp "+extractedTimestamp);
>> return new Watermark(extractedTimestamp - 3);
>> }
>>  }
>>
>>@Override
>> public void processElement(GenericRecord record, Context context, 
>> Collector collector) throws Exception {
>>
>> TimerService timerService = context.timerService();
>> System.out.println("currentwatermark"+ 
>> timerService.currentWatermark());
>> if (context.timestamp() > timerService.currentWatermark()) {
>>
>> Tuple2> queueval = 
>> queueState.value();
>> PriorityQueue queue = queueval.f1;
>> long startTime = queueval.f0;
>> System.out.println("starttime"+ startTime);
>>
>> if (queue == null) {
>> queue = new PriorityQueue<>(10, new TimeStampComprator());
>> startTime = (long) record.get("event_ts");
>> }
>> queueState.update(new Tuple2<>(startTime, queue));
>> timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000);
>> }
>> }
>>
>> }
>>
>> Please help me to underand what i am doing wrong.
>>
>>  --
>> Thanks & Regards,
>> Anuj Jain
>>
>>
>>
>>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> <http://www.oracle.com/>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>


Re: Flink EventTime Processing Watermark is always coming as 9223372036854725808

2020-03-02 Thread aj
Hi David,

Currently, I am testing it with a single source and parallelism 1 only so
not able to understand this behavior.

On Mon, Mar 2, 2020 at 9:02 PM Dawid Wysakowicz 
wrote:

> Hi Anuj,
>
> What parallelism has your source? Do all of your source tasks produce
> records? Watermark is always the minimum of timestamps seen from all the
> upstream operators. Therefore if some of them do not produce records the
> watermark will not progress. You can read more about Watermarks and how
> they work here:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#watermarks-in-parallel-streams
>
> Hope that helps
>
> Best,
>
> Dawid
> On 02/03/2020 16:26, aj wrote:
>
> I am trying to use process function to some processing on a set of events.
> I am using event time and keystream. The issue I am facing is The watermark
> value is always coming as 9223372036854725808. I have put print statement
> to debug and it shows like this:
>
> timestamp--1583128014000 extractedTimestamp 1583128014000
> currentwatermark-9223372036854775808
>
> timestamp--1583128048000 extractedTimestamp 1583128048000
> currentwatermark-9223372036854775808
>
> timestamp--1583128089000 extractedTimestamp 1583128089000
> currentwatermark-9223372036854775808
>
> timestamp and extracted timestamp changing but watermark not getting
> updated. So no record is getting in the queue as context.timestamp is never
> less than the watermark.
>
>
> DataStream dataStream = 
> env.addSource(searchConsumer).name("search_list_keyless");
> DataStream dataStreamWithWaterMark =  
> dataStream.assignTimestampsAndWatermarks(new SessionAssigner());
>
>try {
> dataStreamWithWaterMark.keyBy((KeySelector String>) record -> {
> StringBuilder builder = new StringBuilder();
> builder.append(record.get("session_id"));
> builder.append(record.get("user_id"));
> return builder.toString();
> }).process(new MatchFunction()).print();
> }
> catch (Exception e){
> e.printStackTrace();
> }
> env.execute("start session process");
>
> }
>
> public static class SessionAssigner implements 
> AssignerWithPunctuatedWatermarks  {
> @Override
> public long extractTimestamp(GenericRecord record, long 
> previousElementTimestamp) {
> long timestamp = (long) record.get("event_ts");
> System.out.println("timestamp--"+ timestamp);
> return timestamp;
> }
>
> @Override
> public Watermark checkAndGetNextWatermark(GenericRecord record, long 
> extractedTimestamp) {
> // simply emit a watermark with every event
> System.out.println("extractedTimestamp "+extractedTimestamp);
> return new Watermark(extractedTimestamp - 3);
> }
>  }
>
>@Override
> public void processElement(GenericRecord record, Context context, 
> Collector collector) throws Exception {
>
> TimerService timerService = context.timerService();
> System.out.println("currentwatermark"+ 
> timerService.currentWatermark());
> if (context.timestamp() > timerService.currentWatermark()) {
>
> Tuple2> queueval = 
> queueState.value();
> PriorityQueue queue = queueval.f1;
> long startTime = queueval.f0;
> System.out.println("starttime"+ startTime);
>
> if (queue == null) {
> queue = new PriorityQueue<>(10, new TimeStampComprator());
> startTime = (long) record.get("event_ts");
> }
> queueState.update(new Tuple2<>(startTime, queue));
> timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000);
> }
> }
>
> }
>
> Please help me to underand what i am doing wrong.
>
>  --
> Thanks & Regards,
> Anuj Jain
>
>
>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07






Re: Flink EventTime Processing Watermark is always coming as 9223372036854725808

2020-03-02 Thread Dawid Wysakowicz
Hi Anuj,

What parallelism has your source? Do all of your source tasks produce
records? Watermark is always the minimum of timestamps seen from all the
upstream operators. Therefore if some of them do not produce records the
watermark will not progress. You can read more about Watermarks and how
they work here:
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#watermarks-in-parallel-streams

Hope that helps

Best,

Dawid

On 02/03/2020 16:26, aj wrote:
>
> I am trying to use process function to some processing on a set of
> events. I am using event time and keystream. The issue I am facing is
> The watermark value is always coming as 9223372036854725808. I have
> put print statement to debug and it shows like this:
>
> timestamp--1583128014000 extractedTimestamp 1583128014000
> currentwatermark-9223372036854775808
>
> timestamp--1583128048000 extractedTimestamp 1583128048000
> currentwatermark-9223372036854775808
>
> timestamp--1583128089000 extractedTimestamp 1583128089000
> currentwatermark-9223372036854775808
>
> timestamp and extracted timestamp changing but watermark not getting
> updated. So no record is getting in the queue as context.timestamp is
> never less than the watermark.
>
>
> |DataStream dataStream =
> env.addSource(searchConsumer).name("search_list_keyless");
> DataStream dataStreamWithWaterMark =
> dataStream.assignTimestampsAndWatermarks(new SessionAssigner()); try {
> dataStreamWithWaterMark.keyBy((KeySelector)
> record -> { StringBuilder builder = new StringBuilder();
> builder.append(record.get("session_id"));
> builder.append(record.get("user_id")); return builder.toString();
> }).process(new MatchFunction()).print(); } catch (Exception e){
> e.printStackTrace(); } env.execute("start session process"); } public
> static class SessionAssigner implements
> AssignerWithPunctuatedWatermarks { @Override public
> long extractTimestamp(GenericRecord record, long
> previousElementTimestamp) { long timestamp = (long)
> record.get("event_ts"); System.out.println("timestamp--"+
> timestamp); return timestamp; } @Override public Watermark
> checkAndGetNextWatermark(GenericRecord record, long
> extractedTimestamp) { // simply emit a watermark with every event
> System.out.println("extractedTimestamp "+extractedTimestamp); return
> new Watermark(extractedTimestamp - 3); } }|
> |
> ||
> |@Override public void processElement(GenericRecord record, Context
> context, Collector collector) throws Exception { TimerService
> timerService = context.timerService();
> System.out.println("currentwatermark"+
> timerService.currentWatermark()); if (context.timestamp() >
> timerService.currentWatermark()) { Tuple2 PriorityQueue> queueval = queueState.value();
> PriorityQueue queue = queueval.f1; long startTime =
> queueval.f0; System.out.println("starttime"+ startTime); if (queue
> == null) { queue = new PriorityQueue<>(10, new TimeStampComprator());
> startTime = (long) record.get("event_ts"); } queueState.update(new
> Tuple2<>(startTime, queue));
> timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000); } } }||
> Please help me to underand what i am doing wrong.
> -- 
> Thanks & Regards,
> Anuj Jain
>
>
>


signature.asc
Description: OpenPGP digital signature


Flink EventTime Processing Watermark is always coming as 9223372036854725808

2020-03-02 Thread aj
I am trying to use process function to some processing on a set of events.
I am using event time and keystream. The issue I am facing is The watermark
value is always coming as 9223372036854725808. I have put print statement
to debug and it shows like this:

timestamp--1583128014000 extractedTimestamp 1583128014000
currentwatermark-9223372036854775808

timestamp--1583128048000 extractedTimestamp 1583128048000
currentwatermark-9223372036854775808

timestamp--1583128089000 extractedTimestamp 1583128089000
currentwatermark-9223372036854775808

timestamp and extracted timestamp changing but watermark not getting
updated. So no record is getting in the queue as context.timestamp is never
less than the watermark.


DataStream dataStream =
env.addSource(searchConsumer).name("search_list_keyless");
DataStream dataStreamWithWaterMark =
dataStream.assignTimestampsAndWatermarks(new SessionAssigner());

   try {
dataStreamWithWaterMark.keyBy((KeySelector) record -> {
StringBuilder builder = new StringBuilder();
builder.append(record.get("session_id"));
builder.append(record.get("user_id"));
return builder.toString();
}).process(new MatchFunction()).print();
}
catch (Exception e){
e.printStackTrace();
}
env.execute("start session process");

}

public static class SessionAssigner implements
AssignerWithPunctuatedWatermarks  {
@Override
public long extractTimestamp(GenericRecord record, long
previousElementTimestamp) {
long timestamp = (long) record.get("event_ts");
System.out.println("timestamp--"+ timestamp);
return timestamp;
}

@Override
public Watermark checkAndGetNextWatermark(GenericRecord
record, long extractedTimestamp) {
// simply emit a watermark with every event
System.out.println("extractedTimestamp "+extractedTimestamp);
return new Watermark(extractedTimestamp - 3);
}
 }

   @Override
public void processElement(GenericRecord record, Context context,
Collector collector) throws Exception {

TimerService timerService = context.timerService();
System.out.println("currentwatermark"+
timerService.currentWatermark());
if (context.timestamp() > timerService.currentWatermark()) {

Tuple2> queueval =
queueState.value();
PriorityQueue queue = queueval.f1;
long startTime = queueval.f0;
System.out.println("starttime"+ startTime);

if (queue == null) {
queue = new PriorityQueue<>(10, new TimeStampComprator());
startTime = (long) record.get("event_ts");
}
queueState.update(new Tuple2<>(startTime, queue));
timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000);
}
}

}

Please help me to underand what i am doing wrong.


-- 
Thanks & Regards,
Anuj Jain






Re: flink eventTime, lateness, maxoutoforderness

2017-12-22 Thread chen
Thanks Gordon, Please see the rely. I use code, but the result it doesn`t
like what the doc explain.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink eventTime, lateness, maxoutoforderness

2017-12-22 Thread chen

CODE with maxOutOfOrdernesstime effect:
dataStream.keyBy(row -> (String)row.getField(0))
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.fold(initRow(), new FoldFunction() {
@Override
public Row fold(Row ret, Row o) throws Exception {
ret.setField(0, (int)ret.getField(0) + 1);
ret.setField(1, (String)ret.getField(1)+
o.getField(1) + "|");
return  ret;
}
});
 public Watermark getCurrentWatermark(){
  return new Watermark(currentTime - 5000);}

1. Send Data *WITHOUT*Thread.sleep(), the result is like this :  
1,1483250636000|
4,148325064|1483250642000|1483250641000|1483250643000|
4,1483250649000|1483250648000|1483250645000|1483250647000|
2,148325065|1483250653000|
1,1483250658000|
3,1483250661000|1483250662000|1483250663000|
1,1483250667000|

2. Send Data WITH Thread.sleep(), the result is like this, we will see the
function of maxOutOfOrdernesstime, it will delay calculate, then coming out
result.
1,1483250636000|
2,148325064|1483250642000|
3,1483250649000|1483250648000|1483250645000|
2,148325065|1483250653000|
1,1483250658000|
3,1483250661000|1483250662000|1483250663000|
1,1483250667000|

I don`t know how to explain the eventTime, lateness, maxOutOfOrderness.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink eventTime, lateness, maxoutoforderness

2017-12-22 Thread chen
Hi Eron,
Thanks for your help. Actually I know maxoutoforder, lateness is based
on Event Time. But in my test it is not. Following is my code and test data.
 "key1|148325064|",
 "key1|1483250636000|",
 "key1|1483250649000|",
 "key1|1483250642000|",
 "key1|148325065|",
 "key1|1483250641000|",
 "key1|1483250653000|",
 "key1|1483250648000|",
 "key1|1483250645000|",
 "key1|1483250658000|",
 "key1|1483250647000|",
 "key1|1483250643000|",
 "key1|1483250661000|",
 "key1|1483250662000|",
 "key1|1483250667000|",
 "key1|1483250663000|",

 dataStream.keyBy(row -> (String)row.getField(0))
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(5))
.fold(initRow(), new FoldFunction() {
@Override
public Row fold(Row ret, Row o) throws Exception {
ret.setField(0, (int)ret.getField(0) + 1);
ret.setField(1, (String)ret.getField(1)+
o.getField(1) + "|");
return  ret;
}
})

1. Send Data *WITHOUT*Thread.sleep(), the result is like this :
 1,1483250636000|
 4,148325064|1483250642000|1483250641000|1483250643000|
 4,1483250649000|1483250648000|1483250645000|1483250647000|
 2,148325065|1483250653000|
 1,1483250658000|
 3,1483250661000|1483250662000|1483250663000|
 1,1483250667000|
2. Send Data WITH Thread.sleep(), the result is like this, we will see the
function of allowedLateness, it will trigger the window to calculate again,
the result will come out again.
  1,1483250636000|
  1,148325064|
  2,148325064|1483250642000|
  1,1483250649000|
  2,1483250649000|1483250648000|
  3,1483250649000|1483250648000|1483250645000|
  2,148325065|1483250653000|
  1,1483250658000|
  2,1483250661000|1483250662000|
  3,1483250661000|1483250662000|1483250663000|
  1,1483250667000|




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink eventTime, lateness, maxoutoforderness

2017-12-18 Thread Tzu-Li (Gordon) Tai
Hi,

lateness is record time or the real word time? 
maxoutoforderness is record time or the real word time? 

Both allow lateness of window operators, or maxOutOfOrderness of the 
BoundedOutOfOrdernessTimestampExtractor, refer to event time.

i.e.,
- given the end timestamp of a window is x (in event time) and allowed lateness 
is y, the window state is cleared only when the current watermark of the window 
operator passes x+y
- the BoundedOutOfOrdernessTimestampExtractor emits watermarks that lag behind 
the max record timestamp (in event time) by a fixed amount of time (again, in 
event time).

Hope this clarifies things for you!

Cheers,
Gordon
On 16 December 2017 at 8:07:15 AM, chen (eric__...@126.com) wrote:

eventTime, lateness, maxoutoforderness are all about time.  
event Time is the water mark time on the record.  
lateness is record time or the real word time?  
maxoutoforderness is record time or the real word time?  

dataStream.keyBy(row -> (String)row.getField(0))  
.window(TumblingEventTimeWindows.of(Time.seconds(5)))  
.allowedLateness(Time.seconds(5))  
.fold(initRow(), new MyFoldFunction())  

public Watermark getCurrentWatermark() {  
return new Watermark(currentTime - 5000);}  

Does anyone could explain the time of eventTime,lateness,maxoutoforderness?  



--  
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
 


Re: flink eventTime, lateness, maxoutoforderness

2017-12-16 Thread Eron Wright
Take a look at the section of Flink documentation titled "Event Time and
Watermarks":
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_time.html#event-time-and-watermarks

Also read the excellent series "Streaming 101" and "102", has useful
animations depicting the flow of time:
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102

Think of the watermark as a clock, ticking along due to information from
the connector or from a watermark generator.

Hope this helps,
Eron


On Sat, Dec 16, 2017 at 8:07 AM, chen  wrote:

> eventTime, lateness,  maxoutoforderness are all about time.
> event Time is the water mark time on the record.
> lateness is record time or the real word time?
> maxoutoforderness is record time or the real word time?
>
> dataStream.keyBy(row -> (String)row.getField(0))
> .window(TumblingEventTimeWindows.of(Time.seconds(5)))
>  .allowedLateness(Time.seconds(5))
>  .fold(initRow(), new MyFoldFunction())
>
> public Watermark getCurrentWatermark() {
> return new Watermark(currentTime - 5000);}
>
> Does anyone could explain the time of eventTime,lateness,
> maxoutoforderness?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


flink eventTime, lateness, maxoutoforderness

2017-12-16 Thread chen
eventTime, lateness,  maxoutoforderness are all about time.
event Time is the water mark time on the record.
lateness is record time or the real word time?
maxoutoforderness is record time or the real word time?

dataStream.keyBy(row -> (String)row.getField(0))
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
 .allowedLateness(Time.seconds(5))
 .fold(initRow(), new MyFoldFunction())

public Watermark getCurrentWatermark() {
return new Watermark(currentTime - 5000);}

Does anyone could explain the time of eventTime,lateness,maxoutoforderness?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/