RE: Access to CheckpointStatsCounts

2019-12-05 Thread min.tan
Thanks

From: Robert Metzger [mailto:rmetz...@apache.org]
Sent: Donnerstag, 5. Dezember 2019 10:55
To: Tan, Min
Cc: vino yang; user
Subject: [External] Re: Access to CheckpointStatsCounts

Hey Min,

If checking for empty map states works for you, this could be an option, yes. 
Alternatively, check this out:

CheckpointedFunction.initializeState() will pass you a 
FunctionInitializationContext, which has a method called ".isRestored()"
Best,
Robert

On Thu, Dec 5, 2019 at 10:18 AM mailto:min@ubs.com>> wrote:
Dear Robert,

Thank you very much for sending your reply.

What we try to achieve is that

1)  In a normal situation, checkpoints or save points are preserved, an 
application restarts from one of these paths (with configurations are kept in  
Map states).

2)  Sometimes, e.g. during a version update (with a modified Kafka topic 
name), we could be un able to re start the application from one of these 
checkpoints or savepoints. We have to re start from scratch.

We like to detect if a job starts from a checkpoint or starts from a scratch 
inside the main method.
Then we can let a broadcast stream reading from the "earliest" or "latest" of a 
Kafka topic for configurations.

Or should we just check if the map states are empty insead?

Regards,

Min

From: Robert Metzger [mailto:rmetz...@apache.org]
Sent: Donnerstag, 5. Dezember 2019 09:47
To: Tan, Min
Cc: vino yang; user
Subject: [External] Re: Access to CheckpointStatsCounts

Hey Min,

when a Flink job recovers after a failure, the main method is not re-executed. 
The main method is only executed for the submission of the job. The JobManager 
executing the job is receiving the final job graph. On failure, the JobManager 
will restore the job based on the job graph.
If you start a Flink job from scratch, it will use the current configuration. 
On a failure recovery, the Flink job will rely on the checkpoint.

Please let me know if your problem has been resolved with this information. If 
not, I need a bit more information on what you are trying to achieve, so that I 
can help you better.

Best,
Robert


On Mon, Dec 2, 2019 at 10:12 AM mailto:min@ubs.com>> wrote:
Many thanks for sending your reply.

It is not for monitoring but for configuration.

For a job starting from an empty status, we like to load the fresh 
configurations.
For a job recovering from a checkpoint, we like to rely on the checkpoint.

Regards,

Min

From: vino yang [mailto:yanghua1...@gmail.com]
Sent: Montag, 2. Dezember 2019 10:09
To: Tan, Min
Cc: user
Subject: [External] Re: Access to CheckpointStatsCounts

Hi min,

If it is only for monitoring purposes, you can just use checkpoint REST API[1] 
to do this work.

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html#jobs-jobid-checkpoints

Best,
Vino

mailto:min@ubs.com>> 于2019年12月2日周一 下午5:01写道:
Hi,

Just wonder how to access the CheckpointStatsCoutns from the main method of a 
job?

We need to detect if a job recovers from a checkpoint or starts from an empty 
status.

Regards,

Min

E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

RE: Access to CheckpointStatsCounts

2019-12-05 Thread min.tan
Dear Robert,

Thank you very much for sending your reply.

What we try to achieve is that

1)  In a normal situation, checkpoints or save points are preserved, an 
application restarts from one of these paths (with configurations are kept in  
Map states).

2)  Sometimes, e.g. during a version update (with a modified Kafka topic 
name), we could be un able to re start the application from one of these 
checkpoints or savepoints. We have to re start from scratch.

We like to detect if a job starts from a checkpoint or starts from a scratch 
inside the main method.
Then we can let a broadcast stream reading from the "earliest" or "latest" of a 
Kafka topic for configurations.

Or should we just check if the map states are empty insead?

Regards,

Min

From: Robert Metzger [mailto:rmetz...@apache.org]
Sent: Donnerstag, 5. Dezember 2019 09:47
To: Tan, Min
Cc: vino yang; user
Subject: [External] Re: Access to CheckpointStatsCounts

Hey Min,

when a Flink job recovers after a failure, the main method is not re-executed. 
The main method is only executed for the submission of the job. The JobManager 
executing the job is receiving the final job graph. On failure, the JobManager 
will restore the job based on the job graph.
If you start a Flink job from scratch, it will use the current configuration. 
On a failure recovery, the Flink job will rely on the checkpoint.

Please let me know if your problem has been resolved with this information. If 
not, I need a bit more information on what you are trying to achieve, so that I 
can help you better.

Best,
Robert


On Mon, Dec 2, 2019 at 10:12 AM mailto:min@ubs.com>> wrote:
Many thanks for sending your reply.

It is not for monitoring but for configuration.

For a job starting from an empty status, we like to load the fresh 
configurations.
For a job recovering from a checkpoint, we like to rely on the checkpoint.

Regards,

Min

From: vino yang [mailto:yanghua1...@gmail.com]
Sent: Montag, 2. Dezember 2019 10:09
To: Tan, Min
Cc: user
Subject: [External] Re: Access to CheckpointStatsCounts

Hi min,

If it is only for monitoring purposes, you can just use checkpoint REST API[1] 
to do this work.

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html#jobs-jobid-checkpoints

Best,
Vino

mailto:min@ubs.com>> 于2019年12月2日周一 下午5:01写道:
Hi,

Just wonder how to access the CheckpointStatsCoutns from the main method of a 
job?

We need to detect if a job recovers from a checkpoint or starts from an empty 
status.

Regards,

Min

E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

RE: Access to CheckpointStatsCounts

2019-12-02 Thread min.tan
Many thanks for sending your reply.

It is not for monitoring but for configuration.

For a job starting from an empty status, we like to load the fresh 
configurations.
For a job recovering from a checkpoint, we like to rely on the checkpoint.

Regards,

Min

From: vino yang [mailto:yanghua1...@gmail.com]
Sent: Montag, 2. Dezember 2019 10:09
To: Tan, Min
Cc: user
Subject: [External] Re: Access to CheckpointStatsCounts

Hi min,

If it is only for monitoring purposes, you can just use checkpoint REST API[1] 
to do this work.

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html#jobs-jobid-checkpoints

Best,
Vino

mailto:min@ubs.com>> 于2019年12月2日周一 下午5:01写道:
Hi,

Just wonder how to access the CheckpointStatsCoutns from the main method of a 
job?

We need to detect if a job recovers from a checkpoint or starts from an empty 
status.

Regards,

Min

E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

Access to CheckpointStatsCounts

2019-12-02 Thread min.tan
Hi,

Just wonder how to access the CheckpointStatsCoutns from the main method of a 
job?

We need to detect if a job recovers from a checkpoint or starts from an empty 
status.

Regards,

Min

E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

Savepoints and checkpoints

2019-11-21 Thread min.tan
Hi,

Are Flink savepoints and checkpoitns still vlaid after some data entity changes 
e.g. Kafka topic name changes? I expect the answer is "No"?
Similarly, are Flink savepoints and checkpoitns still valid after some job 
graph changes e.g. one stateful operator splits into two? I expect the answer 
is "No"?

Regards,

Min


E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

RE: Does operator uid() have to be unique across all jobs?

2019-10-25 Thread min.tan
Thank you for your reply.

Any tool enables us to inspect (list) statically all the "uid"ed operators or 
all the operators? for a jar?

Also addSink and addSource are not on the operator list 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/

But they both have an uid method. Are these two an operator or not?

Regards,

Min


From: Dian Fu [mailto:dian0511...@gmail.com]
Sent: Freitag, 25. Oktober 2019 10:24
To: Tan, Min
Cc: John Smith; user@flink.apache.org
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

It means that there is an operator state which has no corresponding operator in 
the new job. It usually indicates that the uid of a stateful operator has 
changed.

在 2019年10月25日,下午4:12,mailto:min@ubs.com>> 
mailto:min@ubs.com>> 写道:

Thanks for your reply.

Our sources and sinks are connected to Kafka, therefore they are statful.

We did not set uid on them but only name().

The log says
Caused by: java.lang.IllegalStateException: Failed to rollback to 
checkpoint/savepoint 
file:/var/flink/data-remote/savepoint-00-dae014102550.
 Cannot map checkpoint/savepoint state for operator 
484df1f961bd0cff95fd39b290ba9c03 to the new program, because the operator is 
not available in the new program. If you want to allow to skip this, you can 
set the --allowNonRestoredState option on the CLI.

Regards,

Min


From: Dian Fu [mailto:dian0511...@gmail.com]
Sent: Freitag, 25. Oktober 2019 10:04
To: Tan, Min
Cc: John Smith; user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

Hi Min,

It depends on the source/sink implementation. If the source/sink implementation 
uses state, uid should be set. So you can always set the uid in this case and 
then you don't need to care about the implementation details of the source/sink 
you used.

name() doesn't have such functionality.

Regarding to the uid mismatch you encountered, could you share the exception 
log?

Regards,
Dian

在 2019年10月25日,下午3:38,min@ubs.com 写道:

Thank you very much for your helpful response.

Our new production release complains about the an uid mismatch (we use exactly 
once checkpoints).
I hope I understand  your correctly: map and print are certainly stateless, 
therefore no uid is required. What about addSink and addSoure? Do they need an 
uid? Or a name() has a similar function?

Regards,

Min

From: Dian Fu [mailto:dian0511...@gmail.com]
Sent: Freitag, 25. Oktober 2019 03:52
To: Tan, Min
Cc: John Smith; user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

Hi Min,

The uid is used to matching the operator state stored in the 
checkpoint/savepoint to an operator[1]. So you only need to specify the uid for 
stateful operators.
1) If you have not specified the uid for an operator, it will generate a uid 
for it in a deterministic way[2] for it. The generated uid doesn't change for 
the same job.
2) However, it's encouraged to set uid for stateful operators to allow for job 
evolution. The dynamically generated uid is not guaranteed to remain the same 
if the job has changed, i.e. adding/removing operators in the job graph. If you 
want to reuse state after job evolution, you need to set the uid explicitly.

So for the example you give, I think you don't need to specify the uid for the 
map and print operator.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/upgrading.html#matching-operator-state
[2] 
https://github.com/apache/flink/blob/fd511c345eac31f03b801ff19dbf1f8c86aae760/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java#L78

在 2019年10月24日,下午11:22,min@ubs.com 写道:

Hi,

I have some simple questions on the uid as well.

1)  Do we add a uid for every operator e.g. print(), addSink and addSource?
2)  For chained operators, do we need to uids for each operator? Or just 
the last operator?
e.g. .map().uid("some-id").print().uid("print-id");


Regards,

Min

From: John Smith [mailto:java.dev@gmail.com]
Sent: Donnerstag, 24. Oktober 2019 16:32
To: Dian Fu
Cc: user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

Ok cool. Thanks

BTW this seems a bit cumbersome...

.map().uid("some-id").name("some-id");

On Wed, 23 Oct 2019 at 21:13, Dian Fu 
mailto:dian0511...@gmail.com>> wrote:
Yes, you can use it in another job. The uid needs only to be unique within a 
job.

> 在 2019年10月24日,上午5:42,John Smith 
> mailto:java.dev@gmail.com>> 写道:
>
> When setting uid() of an operator does it have to be unique across all jobs 
> or just unique within a job?
>
> For example can I use env.addSource(myKafkaConsumer)

RE: Does operator uid() have to be unique across all jobs?

2019-10-25 Thread min.tan
Thanks for your reply.

Our sources and sinks are connected to Kafka, therefore they are statful.

We did not set uid on them but only name().

The log says
Caused by: java.lang.IllegalStateException: Failed to rollback to 
checkpoint/savepoint 
file:/var/flink/data-remote/savepoint-00-dae014102550.
 Cannot map checkpoint/savepoint state for operator 
484df1f961bd0cff95fd39b290ba9c03 to the new program, because the operator is 
not available in the new program. If you want to allow to skip this, you can 
set the --allowNonRestoredState option on the CLI.

Regards,

Min


From: Dian Fu [mailto:dian0511...@gmail.com]
Sent: Freitag, 25. Oktober 2019 10:04
To: Tan, Min
Cc: John Smith; user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

Hi Min,

It depends on the source/sink implementation. If the source/sink implementation 
uses state, uid should be set. So you can always set the uid in this case and 
then you don't need to care about the implementation details of the source/sink 
you used.

name() doesn't have such functionality.

Regarding to the uid mismatch you encountered, could you share the exception 
log?

Regards,
Dian

在 2019年10月25日,下午3:38,min@ubs.com 写道:

Thank you very much for your helpful response.

Our new production release complains about the an uid mismatch (we use exactly 
once checkpoints).
I hope I understand  your correctly: map and print are certainly stateless, 
therefore no uid is required. What about addSink and addSoure? Do they need an 
uid? Or a name() has a similar function?

Regards,

Min

From: Dian Fu [mailto:dian0511...@gmail.com]
Sent: Freitag, 25. Oktober 2019 03:52
To: Tan, Min
Cc: John Smith; user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

Hi Min,

The uid is used to matching the operator state stored in the 
checkpoint/savepoint to an operator[1]. So you only need to specify the uid for 
stateful operators.
1) If you have not specified the uid for an operator, it will generate a uid 
for it in a deterministic way[2] for it. The generated uid doesn't change for 
the same job.
2) However, it's encouraged to set uid for stateful operators to allow for job 
evolution. The dynamically generated uid is not guaranteed to remain the same 
if the job has changed, i.e. adding/removing operators in the job graph. If you 
want to reuse state after job evolution, you need to set the uid explicitly.

So for the example you give, I think you don't need to specify the uid for the 
map and print operator.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/upgrading.html#matching-operator-state
[2] 
https://github.com/apache/flink/blob/fd511c345eac31f03b801ff19dbf1f8c86aae760/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java#L78

在 2019年10月24日,下午11:22,min@ubs.com 写道:

Hi,

I have some simple questions on the uid as well.

1)  Do we add a uid for every operator e.g. print(), addSink and addSource?
2)  For chained operators, do we need to uids for each operator? Or just 
the last operator?
e.g. .map().uid("some-id").print().uid("print-id");


Regards,

Min

From: John Smith [mailto:java.dev@gmail.com]
Sent: Donnerstag, 24. Oktober 2019 16:32
To: Dian Fu
Cc: user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

Ok cool. Thanks

BTW this seems a bit cumbersome...

.map().uid("some-id").name("some-id");

On Wed, 23 Oct 2019 at 21:13, Dian Fu 
mailto:dian0511...@gmail.com>> wrote:
Yes, you can use it in another job. The uid needs only to be unique within a 
job.

> 在 2019年10月24日,上午5:42,John Smith 
> mailto:java.dev@gmail.com>> 写道:
>
> When setting uid() of an operator does it have to be unique across all jobs 
> or just unique within a job?
>
> For example can I use env.addSource(myKafkaConsumer).uid("kafka-consumer") in 
> another job?

E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails.
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how

RE: Does operator uid() have to be unique across all jobs?

2019-10-25 Thread min.tan
Thank you very much for your helpful response.

Our new production release complains about the an uid mismatch (we use exactly 
once checkpoints).
I hope I understand  your correctly: map and print are certainly stateless, 
therefore no uid is required. What about addSink and addSoure? Do they need an 
uid? Or a name() has a similar function?

Regards,

Min

From: Dian Fu [mailto:dian0511...@gmail.com]
Sent: Freitag, 25. Oktober 2019 03:52
To: Tan, Min
Cc: John Smith; user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

Hi Min,

The uid is used to matching the operator state stored in the 
checkpoint/savepoint to an operator[1]. So you only need to specify the uid for 
stateful operators.
1) If you have not specified the uid for an operator, it will generate a uid 
for it in a deterministic way[2] for it. The generated uid doesn't change for 
the same job.
2) However, it's encouraged to set uid for stateful operators to allow for job 
evolution. The dynamically generated uid is not guaranteed to remain the same 
if the job has changed, i.e. adding/removing operators in the job graph. If you 
want to reuse state after job evolution, you need to set the uid explicitly.

So for the example you give, I think you don't need to specify the uid for the 
map and print operator.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/upgrading.html#matching-operator-state
[2] 
https://github.com/apache/flink/blob/fd511c345eac31f03b801ff19dbf1f8c86aae760/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java#L78

在 2019年10月24日,下午11:22,min@ubs.com 写道:

Hi,

I have some simple questions on the uid as well.

1)  Do we add a uid for every operator e.g. print(), addSink and addSource?
2)  For chained operators, do we need to uids for each operator? Or just 
the last operator?
e.g. .map().uid("some-id").print().uid("print-id");


Regards,

Min

From: John Smith [mailto:java.dev@gmail.com]
Sent: Donnerstag, 24. Oktober 2019 16:32
To: Dian Fu
Cc: user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

Ok cool. Thanks

BTW this seems a bit cumbersome...

.map().uid("some-id").name("some-id");

On Wed, 23 Oct 2019 at 21:13, Dian Fu 
mailto:dian0511...@gmail.com>> wrote:
Yes, you can use it in another job. The uid needs only to be unique within a 
job.

> 在 2019年10月24日,上午5:42,John Smith 
> mailto:java.dev@gmail.com>> 写道:
>
> When setting uid() of an operator does it have to be unique across all jobs 
> or just unique within a job?
>
> For example can I use env.addSource(myKafkaConsumer).uid("kafka-consumer") in 
> another job?

E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails.
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement


E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

RE: Does operator uid() have to be unique across all jobs?

2019-10-24 Thread min.tan
Hi,

I have some simple questions on the uid as well.


1)  Do we add a uid for every operator e.g. print(), addSink and addSource?

2)  For chained operators, do we need to uids for each operator? Or just 
the last operator?

e.g. .map().uid("some-id").print().uid("print-id");




Regards,


Min

From: John Smith [mailto:java.dev@gmail.com]
Sent: Donnerstag, 24. Oktober 2019 16:32
To: Dian Fu
Cc: user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

Ok cool. Thanks

BTW this seems a bit cumbersome...

.map().uid("some-id").name("some-id");

On Wed, 23 Oct 2019 at 21:13, Dian Fu 
mailto:dian0511...@gmail.com>> wrote:
Yes, you can use it in another job. The uid needs only to be unique within a 
job.

> 在 2019年10月24日,上午5:42,John Smith 
> mailto:java.dev@gmail.com>> 写道:
>
> When setting uid() of an operator does it have to be unique across all jobs 
> or just unique within a job?
>
> For example can I use env.addSource(myKafkaConsumer).uid("kafka-consumer") in 
> another job?

E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

Job recovery from a checkpoint

2019-09-11 Thread min.tan
Hi,

We can get a job recovery from a save point nicely after a restart of our flink 
cluster using
bin/flink run -s :savepointPath [:runArgs]
The previous job states are recovered after this reload.
I expect I do something similar to recover a flink from a checkpoint location 
after a restart of our flink cluster (job manager and task manager) using
bin/flink run  –s  checkpointPath/_metadata  [:runArgs]
It seems that our reloaded job does not keep the previous states of the job.

Do I do something wrong? I suppose this is doable and no additional 
configuration is required?

Regards,

Min


E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

RE: Recovery from job manager crash using check points

2019-08-21 Thread min.tan
Thanks for the helpful reply.

One more question, does this zookeeper or HA requirement apply for a savepoint?

Can I bounce a single jobmanager cluster and rerun my flink job from its 
previous states with a save point directory? e.g.
./bin/flink run myJob.jar -s savepointDirectory

Regards,

Min

From: Zili Chen [mailto:wander4...@gmail.com]
Sent: Dienstag, 20. August 2019 04:16
To: Biao Liu
Cc: Tan, Min; user
Subject: [External] Re: Recovery from job manager crash using check points

Hi Min,

I guess you use standalone high-availability and when TM fails,
JM can recovered the job from an in-memory checkpoint store.

However, when JM fails, since you don't persist state on ha backend
such as ZooKeeper, even JM relaunched by YARN RM superseded by a
stand by, the new one knows nothing about the previous jobs.

In short, you need to set up ZooKeepers as you yourself mentioned.

Best,
tison.


Biao Liu mailto:mmyy1...@gmail.com>> 于2019年8月19日周一 
下午11:49写道:
Hi Min,

> Do I need to set up zookeepers to keep the states when a job manager crashes?

I guess you need to set up the HA [1] properly. Besides that, I would suggest 
you should also check the state backend.

1. 
https://ci.apache.org/projects/flink/flink-docs-master/ops/jobmanager_high_availability.html
2. 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/state_backends.html

Thanks,
Biao /'bɪ.aʊ/



On Mon, 19 Aug 2019 at 23:28, mailto:min@ubs.com>> wrote:
Hi,

I can use check points to recover Flink states when a task manger crashes.

I can not use check points to recover Flink states when a job manger crashes.

Do I need to set up zookeepers to keep the states when a job manager crashes?

Regards

Min


E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

Recovery from job manager crash using check points

2019-08-19 Thread min.tan
Hi,

I can use check points to recover Flink states when a task manger crashes.

I can not use check points to recover Flink states when a job manger crashes.

Do I need to set up zookeepers to keep the states when a job manager crashes?

Regards

Min


E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

Flink end to end intergration test

2019-06-13 Thread min.tan
Hi,

I am new to Flink, at least to the  testing part.

We need an end to end integration test for a flink job.

Where can I find documentation for this?

I am envisaging a test similar to that:

1)  Start a local job instance in an IDE or maven test

2)  Fire event jsons to the data source (i.e. a Kafka topic)

3)  Retrieve result jsons from the data sink (i.e. a Kafka topic or an 
elastic search index)

4)  Compared result jsons with the expected ones

Since our Flink job is a streaming one, how can we tear the Flink job instance 
running in an IDE?

Regards,

Min


E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

RE: Re: Default Kafka producers pool size for FlinkKafkaProducer.Semantic.EXACTLY_ONCE

2019-04-11 Thread min.tan
Many thanks for your replies.

After I increased MinPauseBetweenCheckpoints and moved to a memory backend for 
checkpoint. It has disappeared.

Thank you both again for your help.


Regards,

Min
From: Piotr Nowojski [mailto:pi...@ververica.com]
Sent: Donnerstag, 11. April 2019 15:01
To: Fabian Hueske
Cc: Tan, Min; user
Subject: [External] Re: Default Kafka producers pool size for 
FlinkKafkaProducer.Semantic.EXACTLY_ONCE

Hi Min and Fabian,

The pool size is independent of the parallelism, task slots count or task 
managers count. The only thing that you should consider is how many 
simultaneous checkpoints you might have in your setup.

As Fabian wrote, with
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

The default value of the pool size of 5 should be more than enough.

Could you double check if something is not overriding those configuration 
values? If not could you provide the JobManager and TaskManager logs?

Piotrek


On 11 Apr 2019, at 09:32, Fabian Hueske 
mailto:fhue...@gmail.com>> wrote:

Hi Min,

I think the pool size is per parallel sink task, i.e., it should be independent 
of the parallelism of the sink operator.
>From my understanding a pool size of 5 should be fine if the maximum number of 
>concurrent checkpoints is 1.
Running out of connections would mean that there are 5 in-flight checkpoints 
that were not completed, which seems a lot to me (given that the sink is 
probably at the end of the program).

If I remember correctly, Piotr (in CC) was working on the exactly-once feature 
of the Kafka producer.
Maybe he can help.

Best,
Fabian

Am Mo., 8. Apr. 2019 um 14:43 Uhr schrieb 
mailto:min@ubs.com>>:
Hi,

I keep getting exceptions 
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many 
ongoing snapshots. Increase kafka producers pool size or decrease number of 
concurrent checkpoints."

I understand that DEFAULT_KAFKA_PRODUCERS_POOL_SIZE is 5 and need to increase 
this size. What considerations should I take to increase this size? what is a 
size for a normal setting e.g. 32?

I have a check point setting like this and run a parallelism of 16 and have a 
check point setting like this

public static void setup(StreamExecutionEnvironment env) {
env.enableCheckpointing(2_000);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1_000);
env.getCheckpointConfig().setCheckpointTimeout(60_000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setStateBackend(new MemoryStateBackend(Integer.MAX_VALUE/64));

//env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}

Regards,

Min


E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

RE: Re: Lantency caised Flink Checkpoint on EXACTLY_ONCE mode

2019-04-09 Thread min.tan
Many thanks for your quick reply.


1)  My implementation has no commits. All commits are done in 
FlinkKafkaProducer class I envisage.



KeyedSerializationSchemaWrapper keyedSerializationSchemaWrapper = new 
KeyedSerializationSchemaWrapper(new SimpleStringSchema());

new FlinkKafkaProducer("test.out", 
keyedSerializationSchemaWrapper,KafkaProperities.getProperties(env), 
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);



If the latency could be as long as the interval of checkpoint, it would be not 
ideal for a long interval setting e.g. a few minutes



2)  My parallelism is set on the job level, I would expect they all have 
the same parallelism for each source, operator and sink. Actually, my test code 
only has one kafka source, one map and one kafka sink. It has produced 
duplication in a restart if I use the at least once mode.

Regards,

Min

From: Guowei Ma [mailto:guowei@gmail.com]
Sent: Sonntag, 7. April 2019 08:42
To: Tan, Min
Cc: user@flink.apache.org
Subject: [External] Re: Lantency caised Flink Checkpoint on EXACTLY_ONCE mode

If your implementation only commits your changing after the complete of a 
checkpoint I think the latency of e2e is at least the interval of checkpoint.

I think the document wants to say that a topology, which only has 
flatmap/filter/map(no  task has more than one input) could achieve the exactly 
once semantics even in at least mode since the effect of barrier alignments in 
at least mode is same as in exactly once mode by coincidence for such topology.

I think there might be some benefits if you could set the parallelism of 
source/sink/flatmap to the same parallelism(there could exist other way) in 
some situation since during the alignments the task, which has many inputs 
would not deal with the elements behind the barrier in exactly mode until the 
barriers of all inputs  arrive.  (If your checkpoint interval is very very long 
I think there would be no difference).


Best
Guowei

mailto:min@ubs.com>>于2019年4月7日 周日上午3:14写道:
Hi,

I have a simple data pipeline of a Kafka source, a flink map operator and  a 
Kafka sink.

I have a quick question about latency caused by the checkpoint on the exactly 
once mode.

Due to the changes are committed and visible on a checkpoint completion, so the 
latency could be as long as that length of checkpoint interval e.g. 5seconds?

Is my understanding correct?

If I use the at least mode, there will be this addition on latency.  More 
interestingly, the flink document 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/internals/stream_checkpointing.html
 indicate that "dataflows with only embarrassingly parallel streaming 
operations (map(), flatMap(), filter(), …) actually give exactly once 
guarantees even in at least once mode."

Unfortunately, I have been not able to achieve the exactly once with the at 
least once. Do I need more settings than I have with the exactly once mode?

Many thanks for the advises in advance.

Min




--
Best,
Guowei

E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

Default Kafka producers pool size for FlinkKafkaProducer.Semantic.EXACTLY_ONCE

2019-04-08 Thread min.tan
Hi,

I keep getting exceptions 
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many 
ongoing snapshots. Increase kafka producers pool size or decrease number of 
concurrent checkpoints."

I understand that DEFAULT_KAFKA_PRODUCERS_POOL_SIZE is 5 and need to increase 
this size. What considerations should I take to increase this size? what is a 
size for a normal setting e.g. 32?

I have a check point setting like this and run a parallelism of 16 and have a 
check point setting like this

public static void setup(StreamExecutionEnvironment env) {
env.enableCheckpointing(2_000);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1_000);
env.getCheckpointConfig().setCheckpointTimeout(60_000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setStateBackend(new MemoryStateBackend(Integer.MAX_VALUE/64));

//env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}

Regards,

Min

E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

Lantency caised Flink Checkpoint on EXACTLY_ONCE mode

2019-04-06 Thread min.tan
Hi,

 

I have a simple data pipeline of a Kafka source, a flink map operator and  a
Kafka sink.

 

I have a quick question about latency caused by the checkpoint on the
exactly once mode.

 

Due to the changes are committed and visible on a checkpoint completion, so
the latency could be as long as that length of checkpoint interval e.g.
5seconds?

 

Is my understanding correct?

 

If I use the at least mode, there will be this addition on latency.  More
interestingly, the flink document
https://ci.apache.org/projects/flink/flink-docs-release-1.7/internals/stream
_checkpointing.html indicate that "dataflows with only embarrassingly
parallel streaming operations (map(), flatMap(), filter(), .) actually give
exactly once guarantees even in at least once mode."

 

Unfortunately, I have been not able to achieve the exactly once with the at
least once. Do I need more settings than I have with the exactly once mode?

 

Many thanks for the advises in advance.

 

Min 

 

 

 

 



smime.p7s
Description: S/MIME cryptographic signature


Default Kafka producers pool size for FlinkKafkaProducer.Semantic.EXACTLY_ONCE

2019-04-05 Thread min.tan
Hi,

 

I keep getting exceptions
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many
ongoing snapshots. Increase kafka producers pool size or decrease number of
concurrent checkpoints."

 

I think that DEFAULT_KAFKA_PRODUCERS_POOL_SIZE is 5 and need to increase
this size. What considerations should I take to increase this size?

 

I have a check point setting like this and run a parallelism of 16 and have
a check point setting like this

 

public static void setup(StreamExecutionEnvironment env) {
env.enableCheckpointing(2_000);
 
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONC
E);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1_000);
env.getCheckpointConfig().setCheckpointTimeout(10_000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setStateBackend(new MemoryStateBackend(Integer.MAX_VALUE));
 
//env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.E
xternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}

 

Regards,

 

Min



smime.p7s
Description: S/MIME cryptographic signature


Flink to MQ connector with checkpoint support for exctly once semantics

2019-03-13 Thread min.tan
Hi,

Our Flink jobs need to read messages from IBM MQ  and write messages into IBM.
Just wonder if there are already some MQ connectors with two phase committee 
sink Function or Checkpoint Listener and checkpoint function implemented to 
support the exactly once semantics.
Many thanks in advance.
Min



E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

Externalised checkpoint keeps ValueState after a crash of a Flink cluster

2019-03-04 Thread min.tan
Hi,

I have a question about  to keep a ValueState after a Flink 1.7.2 cluster is 
crashed.

My Flink job is simple

1) read dummy events (an event only has a string Id) from a Kafka source.
2) do a count on input events and save it as a ValueState
3) setup an externalized checkpoint running every second
4) fire a number of events (e.g. 2K with four threads in parallel) and kill the 
task manager or job manager manually, (kill processId) before the inputs are 
completed
5) restart with the check point in a local file directory

when the task manager gets killed and its recovery task manager restarts, the 
check point works well, i.e. the second picks up events left from the first and 
produce a correct total count of events (e.g. 8K).

When the job manager gets killed and its recovery job manager restarts, the 
check point does not work well. The second still picks up events left from the 
first BUT it does not produce a correct total count of events (e.g. 8K minus 
the count done before the crash).

The command to start the recovery job is  bin/flink run -s 
file:///Users/min/Applications/flink1.7.2/log/c449fad6fbaff3daad6bc526b8a74d18

Any idea about where I could have done incorrectly?

I expect an externalized checkpoint would keep any Value State after a crash of 
the Flink cluster.

Thank you very much for your help in advance.

Regards,

Min

Check out our new brand campaign: www.ubs.com/together
E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/global/en/legalinfo2/privacy.html

Flink 1.7.0 HA based on zookeepers

2019-01-11 Thread min.tan
Hi,

I have a simple HA setting with Flink 1.7.0:
Node1 (active master, active slave) Node2 (standby master, active slave)

Step 1, start-cluster.sh from Node1, no problem
Step 2, manually kill the active master on Node1, no problem and the standby 
master become active
Step 3, bin/jobmanager.sh start cluster  on the Node1
Step 4, manually kill the active master on Node2, no problem and the Node1 
master become active again

But it the overview page does not show the active task managers any more.
Is this expected or I have errors in my HA settings?

There is no error in the Node2 logs but there are java.net.ConnectExceptions in 
Node1 logs.

Regards,

Min


--Exception logs in Node1
2019-01-11 11:32:33,222 WARN  akka.remote.transport.netty.NettyTransport
- Remote connection to [null] failed with 
java.net.ConnectException: Connection refused: /Nod2:39075
2019-01-11 11:32:33,225 WARN  akka.remote.ReliableDeliverySupervisor
- Association with remote system [akka.tcp://flink@Node2:39075] has 
failed, address is now gated for [50] ms. Reason: [Association failed with 
[akka.tcp://flink@Node2:39075]] Caused by: [Connection refused: /Node2:39075]
2019-01-11 11:32:43,233 ERROR 
org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler  - 
Could not retrieve the redirect address.
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask 
timed out on [Actor[akka.tcp://flink@Node2:39075/user/dispatcher#655281870]] 
after [1 ms]. Sender[null] sent message of type 
"org.apache.flink.runtime.rpc.messages.RemoteFencedMessage".
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
at 
scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)
-




Check out our new brand campaign: www.ubs.com/together
E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confide

Zookeeper shared by Flink and Kafka

2019-01-09 Thread min.tan
Hi,

I am new to Flink.

I have a question:
Can a zookeeper cluster be shared by a flink cluster and a kafka cluster?

Regards,

Min

Check out our new brand campaign: www.ubs.com/together
E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/global/en/legalinfo2/privacy.html

RE: Re: Use event time

2018-12-07 Thread min.tan
Many thanks for sending your email.

Does this mean that the event time only impacts on the event selection for a 
time window?

Without use of a time window, the event time has no impact on the order of any 
records/events?

Is my understanding correct?

Thank you very much for your help.

Regards,

Min



From: Piotr Nowojski [mailto:pi...@data-artisans.com]
Sent: Freitag, 7. Dezember 2018 11:11
To: Tan, Min
Cc: user
Subject: [External] Re: Use event time

Hi again!

Flink doesn’t order/sort the records according to event time. The preveiling 
idea is:
- records will be arriving out of order, operators should handle that
- watermarks are used for indicators of the current lower bound of the event 
time “clock”

For examples windowed joins/aggregations  assign records to one or more time 
windows, collect all of the data belonging to a window and when watermark 
exceeds/overtakes the window that when that window is being evaluated.

Piotrek


On 7 Dec 2018, at 09:22, min@ubs.com wrote:

Hi,

I am new to Flink.

I have the following small code to use the event time. I did not get the result 
expected, i.e. it print out events in the order of event time.

Did I miss something here?

Regards,

Min


--Event time--
   public static void main(String[] args) throws Exception  {

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
long start =System.currentTimeMillis();
DataStream stream = env.fromElements(new Event(0,start,start),
new Event(1,start+10,start+10), new Event(2,start+20,start-20),
new Event(3,start+30,start-30), new Event(4,start+40,start-40));

stream.map(event -> "RAW order " + event.toString()).print();


stream.assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor() {
@Override public long extractAscendingTimestamp(Event element) { 
return element.time1; } })
.map(event -> "time1 order:: " + event.toString()).print();


stream.assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor() {
@Override public long extractAscendingTimestamp(Event element) { 
return element.time2; } })
.map(event -> "time2 order:: " + event.toString()).print();

env.execute("event time ");
}


static public class Event {
int id;
long time1;
long time2;

Event(int id, long time1, long time2){
this.id =id;
this.time1=time1;
this.time2=time2;
}

public String toString() {
return "id=" + id + "; time1=" + time1 + "; time2=" + time2;
}
}
}
--

Check out our new brand campaign: 
www.ubs.com/together
E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails.
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/global/en/legalinfo2/privacy.html


Check out our new brand campaign: www.ubs.com/together
E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/global/en/legalinfo2/privacy.html

Use event time

2018-12-07 Thread min.tan
Hi,

I am new to Flink.

I have the following small code to use the event time. I did not get the result 
expected, i.e. it print out events in the order of event time.

Did I miss something here?

Regards,

Min


--Event time--
   public static void main(String[] args) throws Exception  {

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
long start =System.currentTimeMillis();
DataStream stream = env.fromElements(new Event(0,start,start),
new Event(1,start+10,start+10), new Event(2,start+20,start-20),
new Event(3,start+30,start-30), new Event(4,start+40,start-40));

stream.map(event -> "RAW order " + event.toString()).print();

stream.assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor() {
@Override public long extractAscendingTimestamp(Event element) { 
return element.time1; } })
.map(event -> "time1 order:: " + event.toString()).print();

stream.assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor() {
@Override public long extractAscendingTimestamp(Event element) { 
return element.time2; } })
.map(event -> "time2 order:: " + event.toString()).print();

env.execute("event time ");
}


static public class Event {
int id;
long time1;
long time2;

Event(int id, long time1, long time2){
this.id =id;
this.time1=time1;
this.time2=time2;
}

public String toString() {
return "id=" + id + "; time1=" + time1 + "; time2=" + time2;
}
}
}
--

Check out our new brand campaign: www.ubs.com/together
E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/global/en/legalinfo2/privacy.html

A question on the Flink "rolling" FoldFunction

2018-12-07 Thread min.tan
Hi,

I am new to Flink. I have a question on this "rolling" fold function.

If its parallelism is large than one, does the "rolling" order remains the 
same? i.e. it is always keep the "1-2-3-4-5" on an increasing sequence.

Regards,

Min

--- 
FoldFunction 
---
A "rolling" fold on a keyed data stream with an initial value. Combines the 
current element with the last folded value and emits the new value.
A fold function that, when applied on the sequence (1,2,3,4,5), emits the 
sequence "start-1", "start-1-2", "start-1-2-3", ...
DataStream result =
  keyedStream.fold("start", new FoldFunction() {
@Override
public String fold(String current, Integer value) {
return current + "-" + value;
}
  });
---

From: jincheng sun [mailto:sunjincheng...@gmail.com]
Sent: Freitag, 7. Dezember 2018 02:24
To: rakkukumar2...@gmail.com
Cc: user@flink.apache.org; d...@flink.apache.org
Subject: [External] Re: delay one of the datastream when performing join 
operation on event-time and watermark

Hi Pakesh Kuma,
I think you can using the interval-join, e.g.:

orderStream

.keyBy()

.intervalJoin(invoiceStream.keyBy())

.between(Time.minutes(-5), Time.minutes(5))
The semantics of interval-join and detailed usage description can refer to 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/joining.html#interval-join

Hope to help you, and any feedback is welcome!

Bests,
Jincheng


Rakesh Kumar mailto:rakkukumar2...@gmail.com>> 
于2018年12月6日周四 下午7:10写道:
Hi,
I have two data sources one is  for order data and another one is for invoice 
data, these two data i am pushing into kafka topic in json form. I wanted to 
delay order data for 5 mins because invoice data comes only after order data is 
generated. So, for that i have written a flink program which will take these 
two data from kafka and apply watermarks and delay order data for 5 mins. After 
applying watermarks on these data, i wanted to join these data based on 
order_id which is present in both order and invoice data. After Joining i 
wanted to push it to kafka in different topic.

But, i am not able to join these data streams with 5 min delay and i am not 
able to figure it out.

I am attaching my flink program below and it's dependency.

Check out our new brand campaign: www.ubs.com/together
E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/global/en/legalinfo2/privacy.html