Apache Flink - Question about application restart

2020-05-22 Thread M Singh
Hi Flink Folks:
If I have a Flink Application with 10 restarts, if it fails and restarts, then:
1. Does the job have the same id ?2. Does the automatically restarting 
application, pickup from the last checkpoint ? I am assuming it does but just 
want to confirm.
Also, if it is running on AWS EMR I believe EMR/Yarn is configured to restart 
the job 3 times (after it has exhausted it's restart policy) .  If that is the 
case:1. Does the job get a new id ? I believe it does, but just want to 
confirm.2. Does the Yarn restart honor the last checkpoint ?  I believe, it 
does not, but is there a way to make it restart from the last checkpoint of the 
failed job (after it has exhausted its restart policy) ?
Thanks



Apache Flink - Error on creating savepoints using REST interface

2020-05-22 Thread M Singh
Hi:
I am using Flink 1.6.2 and trying to create a savepoint using the following 
curl command using the following references 
(https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html)
 and 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.html
curl -v -H "Content-Type: application/json" -XPOST 
http:///jobs//savepoints -d 
'{"target-directory":"s3://mys3bucket/savepointdirectory/testMay22-sp1/"}' 

But I am getting the following error:
{"errors":["Request did not match expected format 
SavepointTriggerRequestBody."]}

Can you please let me know what I could be missing ?Thanks






Re: Apache Flink - Question about application restart

2020-05-25 Thread M Singh
 Hi Zhu Zhu:
Just to clarify - from what I understand, EMR also has by default restart times 
(I think it is 3). So if the EMR restarts the job - the job id is the same 
since the job graph is the same. 
Thanks for the clarification.
On Monday, May 25, 2020, 04:01:17 AM EDT, Yang Wang  
wrote:  
 
 Just share some additional information.
When deploying Flink application on Yarn and it exhausted restart policy, 
thenthe whole application will failed. If you start another instance(Yarn 
application),even the high availability is configured, we could not recover 
from the latestcheckpoint because the clusterId(i.e. applicationId) has changed.

Best,Yang
Zhu Zhu  于2020年5月25日周一 上午11:17写道:

Hi M,
Regarding your questions:1. yes. The id is fixed once the job graph is 
generated.2. yes
Regarding yarn mode:1. the job id keeps the same because the job graph will be 
generated once at client side and persist in DFS for reuse2. yes if high 
availability is enabled

Thanks,Zhu Zhu
M Singh  于2020年5月23日周六 上午4:06写道:

Hi Flink Folks:
If I have a Flink Application with 10 restarts, if it fails and restarts, then:
1. Does the job have the same id ?2. Does the automatically restarting 
application, pickup from the last checkpoint ? I am assuming it does but just 
want to confirm.
Also, if it is running on AWS EMR I believe EMR/Yarn is configured to restart 
the job 3 times (after it has exhausted it's restart policy) .  If that is the 
case:1. Does the job get a new id ? I believe it does, but just want to 
confirm.2. Does the Yarn restart honor the last checkpoint ?  I believe, it 
does not, but is there a way to make it restart from the last checkpoint of the 
failed job (after it has exhausted its restart policy) ?
Thanks



  

Re: Apache Flink - Error on creating savepoints using REST interface

2020-05-25 Thread M Singh
 Hi Chesney:
The SavepointTriggerRequestBody indicates defaultValue for cancel-job 
attribute, so is it not being honored ?
https://github.com/apache/flink/blob/release-1.6/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBody.java

Thanks
On Saturday, May 23, 2020, 10:17:27 AM EDT, Chesnay Schepler 
 wrote:  
 
  You also have to set the boolean cancel-job parameter.
  
  On 22/05/2020 22:47, M Singh wrote:
  
 
 Hi: 
  I am using Flink 1.6.2 and trying to create a savepoint using the following 
curl command using the following references 
(https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html)
 and 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.html
 
curl -v -H "Content-Type: application/json" -XPOST 
http:///jobs//savepoints -d 
'{"target-directory":"s3://mys3bucket/savepointdirectory/testMay22-sp1/"}' But 
I am getting the following error:{"errors":["Request did not match expected 
format SavepointTriggerRequestBody."]}Can you please let me know what I could 
be missing ?Thanks   

 
   

Re: Apache Flink - Question about application restart

2020-05-26 Thread M Singh
 Hi Zhu Zhu:
I have another clafication - it looks like if I run the same app multiple times 
- it's job id changes.  So it looks like even though the graph is the same the 
job id is not dependent on the job graph only since with different runs of the 
same app it is not the same.
Please let me know if I've missed anything.
Thanks
On Monday, May 25, 2020, 05:32:39 PM EDT, M Singh  
wrote:  
 
  Hi Zhu Zhu:
Just to clarify - from what I understand, EMR also has by default restart times 
(I think it is 3). So if the EMR restarts the job - the job id is the same 
since the job graph is the same. 
Thanks for the clarification.
On Monday, May 25, 2020, 04:01:17 AM EDT, Yang Wang  
wrote:  
 
 Just share some additional information.
When deploying Flink application on Yarn and it exhausted restart policy, 
thenthe whole application will failed. If you start another instance(Yarn 
application),even the high availability is configured, we could not recover 
from the latestcheckpoint because the clusterId(i.e. applicationId) has changed.

Best,Yang
Zhu Zhu  于2020年5月25日周一 上午11:17写道:

Hi M,
Regarding your questions:1. yes. The id is fixed once the job graph is 
generated.2. yes
Regarding yarn mode:1. the job id keeps the same because the job graph will be 
generated once at client side and persist in DFS for reuse2. yes if high 
availability is enabled

Thanks,Zhu Zhu
M Singh  于2020年5月23日周六 上午4:06写道:

Hi Flink Folks:
If I have a Flink Application with 10 restarts, if it fails and restarts, then:
1. Does the job have the same id ?2. Does the automatically restarting 
application, pickup from the last checkpoint ? I am assuming it does but just 
want to confirm.
Also, if it is running on AWS EMR I believe EMR/Yarn is configured to restart 
the job 3 times (after it has exhausted it's restart policy) .  If that is the 
case:1. Does the job get a new id ? I believe it does, but just want to 
confirm.2. Does the Yarn restart honor the last checkpoint ?  I believe, it 
does not, but is there a way to make it restart from the last checkpoint of the 
failed job (after it has exhausted its restart policy) ?
Thanks





ClusterClientFactory selection

2020-05-26 Thread M Singh
Hi:
I wanted to find out which parameter/configuration allows flink cli pick up the 
appropriate cluster client factory (especially in the yarn mode).
Thanks

Re: Apache Flink - Question about application restart

2020-05-28 Thread M Singh
 Hi Till/Zhu/Yang:  Thanks for your replies.
So just to clarify - the job id remains same if the job restarts have not been 
exhausted.  Does Yarn also resubmit the job in case of failures and if so, then 
is the job id different.
ThanksOn Wednesday, May 27, 2020, 10:05:40 AM EDT, Till Rohrmann 
 wrote:  
 
 Hi,
if you submit the same job multiple times, then it will get every time a 
different JobID assigned. For Flink, different job submissions are considered 
to be different jobs. Once a job has been submitted, it will keep the same 
JobID which is important in order to retrieve the checkpoints associated with 
this job.
Cheers,Till
On Tue, May 26, 2020 at 12:42 PM M Singh  wrote:

 Hi Zhu Zhu:
I have another clafication - it looks like if I run the same app multiple times 
- it's job id changes.  So it looks like even though the graph is the same the 
job id is not dependent on the job graph only since with different runs of the 
same app it is not the same.
Please let me know if I've missed anything.
Thanks
On Monday, May 25, 2020, 05:32:39 PM EDT, M Singh  
wrote:  
 
  Hi Zhu Zhu:
Just to clarify - from what I understand, EMR also has by default restart times 
(I think it is 3). So if the EMR restarts the job - the job id is the same 
since the job graph is the same. 
Thanks for the clarification.
On Monday, May 25, 2020, 04:01:17 AM EDT, Yang Wang  
wrote:  
 
 Just share some additional information.
When deploying Flink application on Yarn and it exhausted restart policy, 
thenthe whole application will failed. If you start another instance(Yarn 
application),even the high availability is configured, we could not recover 
from the latestcheckpoint because the clusterId(i.e. applicationId) has changed.

Best,Yang
Zhu Zhu  于2020年5月25日周一 上午11:17写道:

Hi M,
Regarding your questions:1. yes. The id is fixed once the job graph is 
generated.2. yes
Regarding yarn mode:1. the job id keeps the same because the job graph will be 
generated once at client side and persist in DFS for reuse2. yes if high 
availability is enabled

Thanks,Zhu Zhu
M Singh  于2020年5月23日周六 上午4:06写道:

Hi Flink Folks:
If I have a Flink Application with 10 restarts, if it fails and restarts, then:
1. Does the job have the same id ?2. Does the automatically restarting 
application, pickup from the last checkpoint ? I am assuming it does but just 
want to confirm.
Also, if it is running on AWS EMR I believe EMR/Yarn is configured to restart 
the job 3 times (after it has exhausted it's restart policy) .  If that is the 
case:1. Does the job get a new id ? I believe it does, but just want to 
confirm.2. Does the Yarn restart honor the last checkpoint ?  I believe, it 
does not, but is there a way to make it restart from the last checkpoint of the 
failed job (after it has exhausted its restart policy) ?
Thanks




  

Re: ClusterClientFactory selection

2020-05-28 Thread M Singh
 HI Kostas/Yang/Lake:
I am looking at aws emr and did not see the execution.target in the 
flink-conf.yaml file under flink/conf directory. Is it defined in another place 
?  
 I also did search in the current flink source code and did find mention of it 
in the md files but not in any property file or the flink-yarn sub module.  
Please let me know if I am missing anything.
Thanks
On Wednesday, May 27, 2020, 03:51:28 AM EDT, Kostas Kloudas 
 wrote:  
 
 Hi Singh,

The only thing to add to what Yang said is that the "execution.target"
configuration option (in the config file) is also used for the same
purpose from the execution environments.

Cheers,
Kostas

On Wed, May 27, 2020 at 4:49 AM Yang Wang  wrote:
>
> Hi M Singh,
>
> The Flink CLI picks up the correct ClusterClientFactory via java SPI. You
> could check YarnClusterClientFactory#isCompatibleWith for how it is activated.
> The cli option / configuration is "-e/--executor" or execution.target (e.g. 
> yarn-per-job).
>
>
> Best,
> Yang
>
> M Singh  于2020年5月26日周二 下午6:45写道:
>>
>> Hi:
>>
>> I wanted to find out which parameter/configuration allows flink cli pick up 
>> the appropriate cluster client factory (especially in the yarn mode).
>>
>> Thanks  

Re: Apache Flink - Question about application restart

2020-05-28 Thread M Singh
 Thanks Till - in the case of restart of flink master - I believe the jobid 
will be different.  Thanks
On Thursday, May 28, 2020, 11:33:38 AM EDT, Till Rohrmann 
 wrote:  
 
 Hi,
Yarn won't resubmit the job. In case of a process failure where Yarn restarts 
the Flink Master, the Master will recover the submitted jobs from a persistent 
storage system.
Cheers,Till
On Thu, May 28, 2020 at 4:05 PM M Singh  wrote:

 Hi Till/Zhu/Yang:  Thanks for your replies.
So just to clarify - the job id remains same if the job restarts have not been 
exhausted.  Does Yarn also resubmit the job in case of failures and if so, then 
is the job id different.
ThanksOn Wednesday, May 27, 2020, 10:05:40 AM EDT, Till Rohrmann 
 wrote:  
 
 Hi,
if you submit the same job multiple times, then it will get every time a 
different JobID assigned. For Flink, different job submissions are considered 
to be different jobs. Once a job has been submitted, it will keep the same 
JobID which is important in order to retrieve the checkpoints associated with 
this job.
Cheers,Till
On Tue, May 26, 2020 at 12:42 PM M Singh  wrote:

 Hi Zhu Zhu:
I have another clafication - it looks like if I run the same app multiple times 
- it's job id changes.  So it looks like even though the graph is the same the 
job id is not dependent on the job graph only since with different runs of the 
same app it is not the same.
Please let me know if I've missed anything.
Thanks
On Monday, May 25, 2020, 05:32:39 PM EDT, M Singh  
wrote:  
 
  Hi Zhu Zhu:
Just to clarify - from what I understand, EMR also has by default restart times 
(I think it is 3). So if the EMR restarts the job - the job id is the same 
since the job graph is the same. 
Thanks for the clarification.
On Monday, May 25, 2020, 04:01:17 AM EDT, Yang Wang  
wrote:  
 
 Just share some additional information.
When deploying Flink application on Yarn and it exhausted restart policy, 
thenthe whole application will failed. If you start another instance(Yarn 
application),even the high availability is configured, we could not recover 
from the latestcheckpoint because the clusterId(i.e. applicationId) has changed.

Best,Yang
Zhu Zhu  于2020年5月25日周一 上午11:17写道:

Hi M,
Regarding your questions:1. yes. The id is fixed once the job graph is 
generated.2. yes
Regarding yarn mode:1. the job id keeps the same because the job graph will be 
generated once at client side and persist in DFS for reuse2. yes if high 
availability is enabled

Thanks,Zhu Zhu
M Singh  于2020年5月23日周六 上午4:06写道:

Hi Flink Folks:
If I have a Flink Application with 10 restarts, if it fails and restarts, then:
1. Does the job have the same id ?2. Does the automatically restarting 
application, pickup from the last checkpoint ? I am assuming it does but just 
want to confirm.
Also, if it is running on AWS EMR I believe EMR/Yarn is configured to restart 
the job 3 times (after it has exhausted it's restart policy) .  If that is the 
case:1. Does the job get a new id ? I believe it does, but just want to 
confirm.2. Does the Yarn restart honor the last checkpoint ?  I believe, it 
does not, but is there a way to make it restart from the last checkpoint of the 
failed job (after it has exhausted its restart policy) ?
Thanks




  
  

Stopping a job

2020-06-04 Thread M Singh
Hi:
I am running a job which consumes data from Kinesis and send data to another 
Kinesis queue.  I am using an older version of Flink (1.6), and when I try to 
stop the job I get an exception 

 

Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.rest.util.RestClientException: [Job termination (STOP) 
failed: This job is not stoppable.]


I wanted to find out what is a stoppable job and it possible to make a job 
stoppable if is reading/writing to kinesis ?
Thanks




Re: Stopping a job

2020-06-06 Thread M Singh
 Hi Arvid:
I check the link and it indicates that only Storm SpoutSource, TwitterSource 
and NifiSource support stop.   
Does this mean that FlinkKinesisConsumer is not stoppable ?
Also, can you please point me to the Stoppable interface mentioned in the link 
?  I found the following but am not sure if TwitterSource implements it 
:https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java
Thanks




On Friday, June 5, 2020, 02:48:49 PM EDT, Arvid Heise  
wrote:  
 
 Hi,
could you check if this SO thread [1] helps you already?
[1] 
https://stackoverflow.com/questions/53735318/flink-how-to-solve-error-this-job-is-not-stoppable
On Thu, Jun 4, 2020 at 7:43 PM M Singh  wrote:

Hi:
I am running a job which consumes data from Kinesis and send data to another 
Kinesis queue.  I am using an older version of Flink (1.6), and when I try to 
stop the job I get an exception 

 

Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.rest.util.RestClientException: [Job termination (STOP) 
failed: This job is not stoppable.]


I wanted to find out what is a stoppable job and it possible to make a job 
stoppable if is reading/writing to kinesis ?
Thanks





-- 

Arvid Heise | Senior Java Developer




Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) 
Cheng      

Re: Stopping a job

2020-06-06 Thread M Singh
 
Hi Arvid:   
Thanks for the links.  
A few questions:
1. Is there any particular interface in 1.9+ that identifies the source as 
stoppable ?2. Is there any distinction b/w stop and cancel  in 1.9+ ?3. Is 
there any list of sources which are documented as stoppable besides the one 
listed in your SO link ?4. In 1.9+ there is flink stop command and a flink 
cancel command. 
(https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#stop).  So 
it appears that flink stop will take a savepoint and the call cancel, and 
cancel will just cancel the job (looks like cancel with savepoint is deprecated 
in 1.10).  
Thanks again for your help.


On Saturday, June 6, 2020, 02:18:57 PM EDT, Arvid Heise 
 wrote:  
 
 Yes, it seems as if FlinkKinesisConsumer does not implement it.
Here are the links to the respective javadoc [1] and code [2]. Note that in 
later releases (1.9+) this interface has been removed. Stop is now implemented 
through a cancel() on source level.
In general, I don't think that in a Kinesis to Kinesis use case, stop is needed 
anyways, since there is no additional consistency expected over a normal cancel.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/api/common/functions/StoppableFunction.html[2]
 
https://github.com/apache/flink/blob/release-1.6/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java
On Sat, Jun 6, 2020 at 8:03 PM M Singh  wrote:

 Hi Arvid:
I check the link and it indicates that only Storm SpoutSource, TwitterSource 
and NifiSource support stop.   
Does this mean that FlinkKinesisConsumer is not stoppable ?
Also, can you please point me to the Stoppable interface mentioned in the link 
?  I found the following but am not sure if TwitterSource implements it 
:https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java
Thanks




On Friday, June 5, 2020, 02:48:49 PM EDT, Arvid Heise  
wrote:  
 
 Hi,
could you check if this SO thread [1] helps you already?
[1] 
https://stackoverflow.com/questions/53735318/flink-how-to-solve-error-this-job-is-not-stoppable
On Thu, Jun 4, 2020 at 7:43 PM M Singh  wrote:

Hi:
I am running a job which consumes data from Kinesis and send data to another 
Kinesis queue.  I am using an older version of Flink (1.6), and when I try to 
stop the job I get an exception 

 

Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.rest.util.RestClientException: [Job termination (STOP) 
failed: This job is not stoppable.]


I wanted to find out what is a stoppable job and it possible to make a job 
stoppable if is reading/writing to kinesis ?
Thanks





-- 

Arvid Heise | Senior Java Developer




Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) 
Cheng      


-- 

Arvid Heise | Senior Java Developer




Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) 
Cheng      

Flink savepoints history

2020-06-07 Thread M Singh
Hi:
I wanted to find out if we can access the savepoints created for a job or all 
jobs using Flink CLI or REST API.  
I took a look at the CLI (Apache Flink 1.10 Documentation: Command-Line 
Interface) and REST API 
(https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/rest_api.html),
 but did not find this information.
I do see the Flink jobmanager UI show the savepoints - but I am not sure how it 
access that information.
Please let me know how I can access the savepoints information.
Thanks.




Re: Stopping a job

2020-06-08 Thread M Singh
 Thanks Kostas, Arvid, and Senthil for your help.
On Monday, June 8, 2020, 12:47:56 PM EDT, Senthil Kumar 
 wrote:  
 
 #yiv1043440718 #yiv1043440718 -- _filtered {} _filtered {} _filtered {} 
_filtered {} _filtered {}#yiv1043440718 #yiv1043440718 
p.yiv1043440718MsoNormal, #yiv1043440718 li.yiv1043440718MsoNormal, 
#yiv1043440718 div.yiv1043440718MsoNormal 
{margin:0in;margin-bottom:.0001pt;font-size:11.0pt;font-family:sans-serif;}#yiv1043440718
 a:link, #yiv1043440718 span.yiv1043440718MsoHyperlink 
{color:blue;text-decoration:underline;}#yiv1043440718 a:visited, #yiv1043440718 
span.yiv1043440718MsoHyperlinkFollowed 
{color:purple;text-decoration:underline;}#yiv1043440718 
p.yiv1043440718msonormal0, #yiv1043440718 li.yiv1043440718msonormal0, 
#yiv1043440718 div.yiv1043440718msonormal0 
{margin-right:0in;margin-left:0in;font-size:11.0pt;font-family:sans-serif;}#yiv1043440718
 span.yiv1043440718EmailStyle19 
{font-family:sans-serif;color:windowtext;}#yiv1043440718 
.yiv1043440718MsoChpDefault {font-size:10.0pt;} _filtered {}#yiv1043440718 
div.yiv1043440718WordSection1 {}#yiv1043440718 
I am just stating this for completeness.
 
  
 
When a job is cancelled, Flink sends an Interrupt signal to the Thread running 
the Source.run method
 
  
 
For some reason (unknown to me), this does not happen when a Stop command is 
issued.
 
  
 
We ran into some minor issues because of said behavior.
 
  
 
From: Kostas Kloudas 
Date: Monday, June 8, 2020 at 2:35 AM
To: Arvid Heise 
Cc: M Singh , User-Flink 
Subject: Re: Stopping a job
 
  
 
What Arvid said is correct. 
 
The only thing I have to add is that "stop" allows also exactly-once sinks to 
push out their buffered data to their final destination (e.g. Filesystem). In 
other words, it takes into account side-effects, so it guarantees exactly-once 
end-to-end, assuming that you are using exactly-once sources and sinks. Cancel 
with savepoint on the other hand did not necessarily and committing 
side-effects is was following a "best-effort" approach.
 
  
 
For more information you can check [1].
 
  
 
Cheers,
 
Kostas 
 
  
 
[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212
 
  
 
On Mon, Jun 8, 2020 at 10:23 AM Arvid Heise  wrote:
 

It was before I joined the dev team, so the following are kind of speculative:
 
  
 
The concept of stoppable functions never really took off as it was a bit of a 
clumsy approach. There is no fundamental difference between stopping and 
cancelling on (sub)task level. Indeed if you look in the twitter source of 1.6 
[1], cancel() and stop() are doing the exact same thing. I'd assume that this 
is probably true for all sources.
 
  
 
So what is the difference between cancel and stop then? It's more the way on 
how you terminate the whole DAG. On cancelling, you cancel() on all tasks more 
or less simultaneously. If you want to stop, it's more a fine-grain cancel, 
where you stop first the sources and then let the tasks close themselves when 
all upstream tasks are done. Just before closing the tasks, you also take a 
snapshot. Thus, the difference should not be visible in user code but only in 
the Flink code itself (task/checkpoint coordinator)
 
  
 
So for your question:
 
1. No, as on task level stop() and cancel() are the same thing on UDF level.
 
2. Yes, stop will be more graceful and creates a snapshot. [2] 
 
3. Not that I am aware of. In the whole flink code base, there are no more (see 
javadoc). You could of course check if there are some in Bahir. But it 
shouldn't really matter. There is no huge difference between stopping and 
cancelling if you wait for a checkpoint to finish. 
 
4. Okay you answered your second question ;) Yes cancel with savepoint = stop 
now to make it easier for new users.
 
  
 
[1] 
https://github.com/apache/flink/blob/release-1.6/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java#L180-L190
 
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html
 
  
 
On Sun, Jun 7, 2020 at 1:04 AM M Singh  wrote:
 

  
 
Hi Arvid:   
 
  
 
Thanks for the links.  
 
  
 
A few questions:
 
  
 
1. Is there any particular interface in 1.9+ that identifies the source as 
stoppable ?
 
2. Is there any distinction b/w stop and cancel  in 1.9+ ?
 
3. Is there any list of sources which are documented as stoppable besides the 
one listed in your SO link ?
 
4. In 1.9+ there is flink stop command and a flink cancel command. 
(https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#stop).  So 
it appears that flink stop will take a savepoint and the call cancel, and 
cancel will just cancel the job (looks like cancel with savepoint is deprecated 
in 1.10).  
 
  
 
Thanks again for your help.
 
  
 
  
 
  
 
On Saturday, June 6, 2020, 02:18:57 PM EDT, Arvid Heise  
wrote:
 
  
 
  
 
Yes, it seems as if FlinkKinesisConsumer does not im

Kinesis ProvisionedThroughputExceededException

2020-06-15 Thread M Singh
Hi:
I am using multiple (almost 30 and growing) Flink streaming applications that 
read from the same kinesis stream and get 
ProvisionedThroughputExceededException exception which fails the job.

I have seen a reference 
http://mail-archives.apache.org/mod_mbox/flink-user/201811.mbox/%3CCAJnSTVxpuOhCNTFTvEYd7Om4s=q2vz5-8+m4nvuutmj2oxu...@mail.gmail.com%3E
 - which indicates there might be some solution perhaps in Flink 1.8/1.9.  

I also see [FLINK-10536] Flink Kinesis Consumer leading to job failure due to 
ProvisionedThroughputExceededException - ASF JIRA is still open.

So i wanted to find out 
1. If this issue has been resolved and if so in which version ?2. Is there any 
kinesis consumer with kinesis fanout available that can help address this issue 
?3. Is there any specific parameter in kinesis consumer config that can address 
this issue ?
If there is any other pointer/documentation/reference, please let me know.
Thanks


Re: Kinesis ProvisionedThroughputExceededException

2020-06-16 Thread M Singh
 
Thanks Roman for your response and advice.
>From my understanding increasing shards will increase throughput but still if 
>more than 5 requests are made per shard/per second, and since we have 20 apps 
>(and increasing) then the exception might occur. 
Please let me know if I have missed anything.
MansOn Tuesday, June 16, 2020, 03:29:59 PM EDT, Roman Grebennikov 
 wrote:  
 
 #yiv4708954190 p.yiv4708954190MsoNormal, #yiv4708954190 
p.yiv4708954190MsoNoSpacing{margin:0;}Hi, 

usually this exception is thrown by aws-java-sdk and means that your kinesis 
stream is hitting a throughput limit (what a surprise). We experienced the same 
thing when we had a single "event-bus" style stream and multiple flink apps 
reading from it.

Each Kinesis partition has a limit of 5 poll operations per second. If you have 
a stream with 4 partitions and 30 jobs reading from it, I guess that each job 
is constantly hitting op limit for kinesis with default kinesis consumer 
settings and it does an exponential back-off (by just sleeping for a small 
period of time and then retrying).

You have two options here:
1. scale up the kinesis stream, so there will be more partitions and higher 
overall throughput limits
2. tune kinesis consumer backoff parameters:

Our current ones, for example, look like this:

    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000") 
// we poll every 2s
    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "2000") // 
in case of throughput error, initial timeout is 2s
    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "1") // 
we can go up to 10s pause
    
conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, 
"1.5") // multiplying pause to 1.5 on each next step
    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "100") // and 
make up to 100 retries

with best regards,
Roman Grebennikov | g...@dfdx.me


On Mon, Jun 15, 2020, at 13:45, M Singh wrote:

Hi:

I am using multiple (almost 30 and growing) Flink streaming applications that 
read from the same kinesis stream and get 
ProvisionedThroughputExceededException exception which fails the job.
I have seen a reference 
http://mail-archives.apache.org/mod_mbox/flink-user/201811.mbox/%3CCAJnSTVxpuOhCNTFTvEYd7Om4s=q2vz5-8+m4nvuutmj2oxu...@mail.gmail.com%3E
 - which indicates there might be some solution perhaps in Flink 1.8/1.9.  

I also see [FLINK-10536] Flink Kinesis Consumer leading to job failure due to 
ProvisionedThroughputExceededException - ASF JIRA is still open.


So i wanted to find out 

1. If this issue has been resolved and if so in which version ?
2. Is there any kinesis consumer with kinesis fanout available that can help 
address this issue ?
3. Is there any specific parameter in kinesis consumer config that can address 
this issue ?

If there is any other pointer/documentation/reference, please let me know.

Thanks



  

Re: Kinesis ProvisionedThroughputExceededException

2020-06-18 Thread M Singh
 Thanks Roman for your response.  Mans
On Wednesday, June 17, 2020, 03:26:31 AM EDT, Roman Grebennikov 
 wrote:  
 
 #yiv4075825537 p.yiv4075825537MsoNormal, #yiv4075825537 
p.yiv4075825537MsoNoSpacing{margin:0;}Hi,

It will occur if your job will reach SHARD_GETRECORDS_RETRIES consecutive 
failed attempts to pull the data from kinesis.
So if you scale up the topic in kinesis and tune a bit backoff parameters, you 
will lower the probability of this exception almost to zero (but with increased 
costs and worst-case latency).

But yes, this is a main drawback of managed solutions - as far as you reach a 
significant load, you need to pay more. Other managed option within AWS is to 
switch to MSK, managed Kafka, which has no such significant restrictions.

And the final option is to wait until FLINK-17688 will be implemented (using 
Kinesis enhanced fan-out, so Kinesis will push the data to consumer, instead of 
consumer periodically pulling the data).

Roman Grebennikov | g...@dfdx.me


On Wed, Jun 17, 2020, at 04:39, M Singh wrote:



Thanks Roman for your response and advice.

>From my understanding increasing shards will increase throughput but still if 
>more than 5 requests are made per shard/per second, and since we have 20 apps 
>(and increasing) then the exception might occur. 

Please let me know if I have missed anything.

Mans
On Tuesday, June 16, 2020, 03:29:59 PM EDT, Roman Grebennikov  
wrote:


Hi, 

usually this exception is thrown by aws-java-sdk and means that your kinesis 
stream is hitting a throughput limit (what a surprise). We experienced the same 
thing when we had a single "event-bus" style stream and multiple flink apps 
reading from it.

Each Kinesis partition has a limit of 5 poll operations per second. If you have 
a stream with 4 partitions and 30 jobs reading from it, I guess that each job 
is constantly hitting op limit for kinesis with default kinesis consumer 
settings and it does an exponential back-off (by just sleeping for a small 
period of time and then retrying).

You have two options here:
1. scale up the kinesis stream, so there will be more partitions and higher 
overall throughput limits
2. tune kinesis consumer backoff parameters:

Our current ones, for example, look like this:

    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000") 
// we poll every 2s
    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "2000") // 
in case of throughput error, initial timeout is 2s
    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "1") // 
we can go up to 10s pause
    
conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, 
"1.5") // multiplying pause to 1.5 on each next step
    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "100") // and 
make up to 100 retries

with best regards,
Roman Grebennikov | g...@dfdx.me


On Mon, Jun 15, 2020, at 13:45, M Singh wrote:

Hi:

I am using multiple (almost 30 and growing) Flink streaming applications that 
read from the same kinesis stream and get 
ProvisionedThroughputExceededException exception which fails the job.
I have seen a reference 

http://mail-archives.apache.org/mod_mbox/flink-user/201811.mbox/%3CCAJnSTVxpuOhCNTFTvEYd7Om4s=q2vz5-8+m4nvuutmj2oxu...@mail.gmail.com%3E
 - which indicates there might be some solution perhaps in Flink 1.8/1.9.  

I also see [FLINK-10536] Flink Kinesis Consumer leading to job failure due to 
ProvisionedThroughputExceededException - ASF JIRA is still open.


So i wanted to find out 

1. If this issue has been resolved and if so in which version ?
2. Is there any kinesis consumer with kinesis fanout available that can help 
address this issue ?
3. Is there any specific parameter in kinesis consumer config that can address 
this issue ?

If there is any other pointer/documentation/reference, please let me know.

Thanks





  

Flink AskTimeoutException killing the jobs

2020-07-02 Thread M Singh
Hi:
I am using Flink 1.10 on AWS EMR cluster.
We are getting AskTimeoutExceptions which is causing the flink jobs to die.   
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/resourcemanager#-1602864959]] after [1 ms]. 
Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A 
typical reason for `AskTimeoutException` is that the recipient actor didn't 
send a reply.at 
akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)at 
akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
... 9 more

Can you please let me know where can I set the timeout for this timeout ? 
I could not find this specific timeout in the flink doc - Apache Flink 1.10 
Documentation: Configuration.

Thanks
Mans

Re: Flink AskTimeoutException killing the jobs

2020-07-03 Thread M Singh
 Hi Xintong/LakeShen:
We have the following setting in flink-conf.yaml

akka.ask.timeout: 180 s

akka.tcp.timeout: 180 s


But still see this exception.  Are there multiple akka.ask.timeout or 
additional settings required ?

Thanks
Mans
On Friday, July 3, 2020, 01:08:05 AM EDT, Xintong Song 
 wrote:  
 
 The configuration option you're looking for is `akka.ask.timeout`.




However, I'm not sure increasing this configuration would help in your case. 
The error message shows that there is a timeout on a local message. It is wired 
a local message does not get replied within 10 sec. I would suggest to look 
into the jobmanager logs and gc logs, see if there's any problem that prevent 
the process from handling the rpc messages timely.




Thank you~

Xintong Song




On Fri, Jul 3, 2020 at 3:51 AM M Singh  wrote:

Hi:
I am using Flink 1.10 on AWS EMR cluster.
We are getting AskTimeoutExceptions which is causing the flink jobs to die.   
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/resourcemanager#-1602864959]] after [1 ms]. 
Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A 
typical reason for `AskTimeoutException` is that the recipient actor didn't 
send a reply.at 
akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)at 
akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
... 9 more

Can you please let me know where can I set the timeout for this timeout ? 
I could not find this specific timeout in the flink doc - Apache Flink 1.10 
Documentation: Configuration.

Thanks
Mans
  

Re: Flink AskTimeoutException killing the jobs

2020-07-06 Thread M Singh
 Thanks Xintong.  I will check the logs.  
On Sunday, July 5, 2020, 09:29:31 PM EDT, Xintong Song 
 wrote:  
 
 As I already mentioned,

I would suggest to look into the jobmanager logs and gc logs, see if there's 
any problem that prevent the process from handling the rpc messages timely.


The Akka ask timeout does not seem to be the root problem to me.

Thank you~

Xintong Song




On Sat, Jul 4, 2020 at 12:12 AM M Singh  wrote:

 Hi Xintong/LakeShen:
We have the following setting in flink-conf.yaml

akka.ask.timeout: 180 s

akka.tcp.timeout: 180 s


But still see this exception.  Are there multiple akka.ask.timeout or 
additional settings required ?

Thanks
Mans
On Friday, July 3, 2020, 01:08:05 AM EDT, Xintong Song 
 wrote:  
 
 The configuration option you're looking for is `akka.ask.timeout`.




However, I'm not sure increasing this configuration would help in your case. 
The error message shows that there is a timeout on a local message. It is wired 
a local message does not get replied within 10 sec. I would suggest to look 
into the jobmanager logs and gc logs, see if there's any problem that prevent 
the process from handling the rpc messages timely.




Thank you~

Xintong Song




On Fri, Jul 3, 2020 at 3:51 AM M Singh  wrote:

Hi:
I am using Flink 1.10 on AWS EMR cluster.
We are getting AskTimeoutExceptions which is causing the flink jobs to die.   
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/resourcemanager#-1602864959]] after [1 ms]. 
Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A 
typical reason for `AskTimeoutException` is that the recipient actor didn't 
send a reply.at 
akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)at 
akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
... 9 more

Can you please let me know where can I set the timeout for this timeout ? 
I could not find this specific timeout in the flink doc - Apache Flink 1.10 
Documentation: Configuration.

Thanks
Mans
  
  

Apache Flink - Operator name and uuid best practices

2019-11-16 Thread M Singh
Hi:
I am working on a project and wanted to find out what are the best practices 
for setting name and uuid for operators:
1. Are there any restrictions on the length of the name and uuid attributes ?2. 
Are there any restrictions on the characters used for name and uuid (blank 
spaces, etc) ?3. Can the name and uuid be the same ?
Please let me know if there is any other advice.
Thanks
Mans

Re: Re:Apache Flink - Operator name and uuid best practices

2019-11-16 Thread M Singh
 Thanks Jiayi for your response. I am thinking on the same lines.  
Regarding using the same name and uuid, I believe the checkpoint state for an 
operator will be easy to identify if the uuid is the same as name.  But I am 
not sure if having a very long name and uuid or a character like parenthesis, 
etc might cause any issues, so just wanted to check.
Mans
On Saturday, November 16, 2019, 11:19:08 AM EST, Jiayi Liao 
 wrote:  
 
 
Hi Mans!




Firstly let’s see how operator’s name and uid is used. AFAIK, operator’s name 
is used in WebUI and metrics reporting, and uid is used to mark the uniqueness 
of operator which is useful when you’re using state[1].




> Are there any restrictions on the length of the name and uuid attributes?

It’s pretty much the same as you define a string value, so there is no special 
restrictions on this.




> Are there any restrictions on the characters used for name and uuid (blank 
>spaces, etc) ?

I’m not a hundred percent sure about this but I run a testing program and it 
works fine.




> Can the name and uuid be the same ? 

Yes. But uuids accross operators cannot be same.




For me I usually set name and uuid for almost every operator, which gives me 
better experience in monitoring and scaling.




Hope this helps.







[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#matching-operator-state







Best,



Jiayi Liao


At 2019-11-16 18:35:38, "M Singh"  wrote:
 
Hi:
I am working on a project and wanted to find out what are the best practices 
for setting name and uuid for operators:
1. Are there any restrictions on the length of the name and uuid attributes ?2. 
Are there any restrictions on the characters used for name and uuid (blank 
spaces, etc) ?3. Can the name and uuid be the same ?
Please let me know if there is any other advice.
Thanks
Mans



 
  

Apache Airflow - Question about checkpointing and re-run a job

2019-11-16 Thread M Singh
Hi:
I have a Flink job and sometimes I need to cancel and re run it.  From what I 
understand the checkpoints for a job are saved under the job id directory at 
the checkpoint location. If I run the same job again, it will get a new job id 
and the checkpoint saved from the previous run job (which is saved under the 
previous job's id dir) will not be used for this new run. Is that a correct 
understanding ?  If I need to re-run the job from the previous checkpoint - is 
there any way to do that automatically without using a savepoint ?
Also, I believe the internal job restarts do not change the job id so in those 
cases where the job restarts will pick the state from the saved checkpoint.  Is 
my understanding correct ?
Thanks
Mans

Re: Apache Airflow - Question about checkpointing and re-run a job

2019-11-17 Thread M Singh
 Folks - Please let me know if you have any advice on this question.  Thanks
On Saturday, November 16, 2019, 02:39:18 PM EST, M Singh 
 wrote:  
 
 Hi:
I have a Flink job and sometimes I need to cancel and re run it.  From what I 
understand the checkpoints for a job are saved under the job id directory at 
the checkpoint location. If I run the same job again, it will get a new job id 
and the checkpoint saved from the previous run job (which is saved under the 
previous job's id dir) will not be used for this new run. Is that a correct 
understanding ?  If I need to re-run the job from the previous checkpoint - is 
there any way to do that automatically without using a savepoint ?
Also, I believe the internal job restarts do not change the job id so in those 
cases where the job restarts will pick the state from the saved checkpoint.  Is 
my understanding correct ?
Thanks
Mans  

Re: Apache Airflow - Question about checkpointing and re-run a job

2019-11-18 Thread M Singh
 Thanks Congxian for your answer and reference.  Mans
On Sunday, November 17, 2019, 08:59:16 PM EST, Congxian Qiu 
 wrote:  
 
 HiYes, checkpoint data locates under jobid dir. you can try to restore from 
the retained checkpoint[1][1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
Best,Congxian

M Singh  于2019年11月18日周一 上午2:54写道:

 Folks - Please let me know if you have any advice on this question.  Thanks
On Saturday, November 16, 2019, 02:39:18 PM EST, M Singh 
 wrote:  
 
 Hi:
I have a Flink job and sometimes I need to cancel and re run it.  From what I 
understand the checkpoints for a job are saved under the job id directory at 
the checkpoint location. If I run the same job again, it will get a new job id 
and the checkpoint saved from the previous run job (which is saved under the 
previous job's id dir) will not be used for this new run. Is that a correct 
understanding ?  If I need to re-run the job from the previous checkpoint - is 
there any way to do that automatically without using a savepoint ?
Also, I believe the internal job restarts do not change the job id so in those 
cases where the job restarts will pick the state from the saved checkpoint.  Is 
my understanding correct ?
Thanks
Mans  
  

Re: Re:Apache Flink - Operator name and uuid best practices

2019-11-20 Thread M Singh
 Hi Arvid:
Thanks for your clarification.
I am giving supplying uid for the stateful operators and find the following 
directory structure on in the chkpoint directory:

f4e78cb47f9dc12859558be7d15f39d0/chk-6/a4d87cda-2afd-47d4-8d3f-b0658466fb2d
The first part f4e78cb47f9dc12859558be7d15f39d0 is the job_idIs there a way to 
map the last part (uuid a4d87cda-2afd-47d4-8d3f-b0658466fb2d) -  to the uid 
assigned in the application ?

Thanks 
On Wednesday, November 20, 2019, 07:52:49 AM EST, Arvid Heise 
 wrote:  
 
 Hi Mans,
just to follow up. There are no limitations for name or uuid. 

The uuid will be in fact hashed internally while the StreamGraph is being 
generated, so all characters are allowed.The name is only for debugging 
purposes and web ui. If you use very special characters, you may see oddities 
in logs/web ui, but nothing should break. 
Spaces or parentheses should work in any case.

Best,
Arvid

On Sat, Nov 16, 2019 at 6:40 PM M Singh  wrote:

 Thanks Jiayi for your response. I am thinking on the same lines.  
Regarding using the same name and uuid, I believe the checkpoint state for an 
operator will be easy to identify if the uuid is the same as name.  But I am 
not sure if having a very long name and uuid or a character like parenthesis, 
etc might cause any issues, so just wanted to check.
Mans
On Saturday, November 16, 2019, 11:19:08 AM EST, Jiayi Liao 
 wrote:  
 
 
Hi Mans!




Firstly let’s see how operator’s name and uid is used. AFAIK, operator’s name 
is used in WebUI and metrics reporting, and uid is used to mark the uniqueness 
of operator which is useful when you’re using state[1].




> Are there any restrictions on the length of the name and uuid attributes?

It’s pretty much the same as you define a string value, so there is no special 
restrictions on this.




> Are there any restrictions on the characters used for name and uuid (blank 
>spaces, etc) ?

I’m not a hundred percent sure about this but I run a testing program and it 
works fine.




> Can the name and uuid be the same ? 

Yes. But uuids accross operators cannot be same.




For me I usually set name and uuid for almost every operator, which gives me 
better experience in monitoring and scaling.




Hope this helps.







[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#matching-operator-state







Best,



Jiayi Liao


At 2019-11-16 18:35:38, "M Singh"  wrote:
 
Hi:
I am working on a project and wanted to find out what are the best practices 
for setting name and uuid for operators:
1. Are there any restrictions on the length of the name and uuid attributes ?2. 
Are there any restrictions on the characters used for name and uuid (blank 
spaces, etc) ?3. Can the name and uuid be the same ?
Please let me know if there is any other advice.
Thanks
Mans



 
  
  

Re: Re:Apache Flink - Operator name and uuid best practices

2019-11-21 Thread M Singh
 Hi Congxian:
For my application i see many uuids under the chk-6 directory ( I posted one in 
the sample above).   I am trying to understand that if I restart the 
application with this checkpoint (which I believe I can just like a savepoint - 
I am using chk-6 as an example below)1. I believe each chk- - is a complete 
state of checkpoint. Is that correct ?2. How to I point it to the checkpoint 
(chk-6) when I submit the job - Do I point it to the jobid or the chk-6 
directory ?  I am presuming the latter (ie, pointing to the chk06 directory but 
just want to confirm.3. Secondly, how does the application map the the files 
under the chk-6 to restore the state of each of the stateful operators ?4. Is 
there any API by which I can examine the contents of the files under the chk-6 
directory ?
Thanks for your help.
Mans
On Wednesday, November 20, 2019, 09:13:39 PM EST, Congxian Qiu 
 wrote:  
 
 Hi
Currently, the last part (uuid a4d87cda-2afd-47d4-8d3f-b0658466fb2d) is 
generated by UUID.randomUUID(), so there is not a easy way to map this to the 
assigned in the application.In another word, the last part (uuid 
a4d87cda-2afd-47d4-8d3f-b0658466fb2d) belongs to one checkpoint, and the 
assigned in the application belongs to one operator, they are different.
Best,Congxian

M Singh  于2019年11月21日周四 上午6:18写道:

 Hi Arvid:
Thanks for your clarification.
I am giving supplying uid for the stateful operators and find the following 
directory structure on in the chkpoint directory:

f4e78cb47f9dc12859558be7d15f39d0/chk-6/a4d87cda-2afd-47d4-8d3f-b0658466fb2d
The first part f4e78cb47f9dc12859558be7d15f39d0 is the job_idIs there a way to 
map the last part (uuid a4d87cda-2afd-47d4-8d3f-b0658466fb2d) -  to the uid 
assigned in the application ?

Thanks 
On Wednesday, November 20, 2019, 07:52:49 AM EST, Arvid Heise 
 wrote:  
 
 Hi Mans,
just to follow up. There are no limitations for name or uuid. 

The uuid will be in fact hashed internally while the StreamGraph is being 
generated, so all characters are allowed.The name is only for debugging 
purposes and web ui. If you use very special characters, you may see oddities 
in logs/web ui, but nothing should break. 
Spaces or parentheses should work in any case.

Best,
Arvid

On Sat, Nov 16, 2019 at 6:40 PM M Singh  wrote:

 Thanks Jiayi for your response. I am thinking on the same lines.  
Regarding using the same name and uuid, I believe the checkpoint state for an 
operator will be easy to identify if the uuid is the same as name.  But I am 
not sure if having a very long name and uuid or a character like parenthesis, 
etc might cause any issues, so just wanted to check.
Mans
On Saturday, November 16, 2019, 11:19:08 AM EST, Jiayi Liao 
 wrote:  
 
 
Hi Mans!




Firstly let’s see how operator’s name and uid is used. AFAIK, operator’s name 
is used in WebUI and metrics reporting, and uid is used to mark the uniqueness 
of operator which is useful when you’re using state[1].




> Are there any restrictions on the length of the name and uuid attributes?

It’s pretty much the same as you define a string value, so there is no special 
restrictions on this.




> Are there any restrictions on the characters used for name and uuid (blank 
>spaces, etc) ?

I’m not a hundred percent sure about this but I run a testing program and it 
works fine.




> Can the name and uuid be the same ? 

Yes. But uuids accross operators cannot be same.




For me I usually set name and uuid for almost every operator, which gives me 
better experience in monitoring and scaling.




Hope this helps.







[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#matching-operator-state







Best,



Jiayi Liao


At 2019-11-16 18:35:38, "M Singh"  wrote:
 
Hi:
I am working on a project and wanted to find out what are the best practices 
for setting name and uuid for operators:
1. Are there any restrictions on the length of the name and uuid attributes ?2. 
Are there any restrictions on the characters used for name and uuid (blank 
spaces, etc) ?3. Can the name and uuid be the same ?
Please let me know if there is any other advice.
Thanks
Mans



 
  
  
  

Re: Re:Apache Flink - Operator name and uuid best practices

2019-11-21 Thread M Singh
 Thanks so much Congxian for your pointers - I will try to go through them.  
Mans
On Thursday, November 21, 2019, 07:26:49 AM EST, Congxian Qiu 
 wrote:  
 
 Hi1. I think this issue[1] can help to understand the directory layout2. chk-6 
directory or the metafilePath, for more information, you can ref to[2][3]3. 
every checkpoint contains a meta-file record such, maybe you can ref to[4], and 
debug it for more information4. currently, you need to go through  the restore 
logic to see if the files' contents
[1] https://issues.apache.org/jira/browse/FLINK-8531[2] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1153[3]
 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint[4]
 
https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
Best,Congxian

M Singh  于2019年11月21日周四 下午7:44写道:

 Hi Congxian:
For my application i see many uuids under the chk-6 directory ( I posted one in 
the sample above).   I am trying to understand that if I restart the 
application with this checkpoint (which I believe I can just like a savepoint - 
I am using chk-6 as an example below)1. I believe each chk- - is a complete 
state of checkpoint. Is that correct ?2. How to I point it to the checkpoint 
(chk-6) when I submit the job - Do I point it to the jobid or the chk-6 
directory ?  I am presuming the latter (ie, pointing to the chk06 directory but 
just want to confirm.3. Secondly, how does the application map the the files 
under the chk-6 to restore the state of each of the stateful operators ?4. Is 
there any API by which I can examine the contents of the files under the chk-6 
directory ?
Thanks for your help.
Mans
On Wednesday, November 20, 2019, 09:13:39 PM EST, Congxian Qiu 
 wrote:  
 
 Hi
Currently, the last part (uuid a4d87cda-2afd-47d4-8d3f-b0658466fb2d) is 
generated by UUID.randomUUID(), so there is not a easy way to map this to the 
assigned in the application.In another word, the last part (uuid 
a4d87cda-2afd-47d4-8d3f-b0658466fb2d) belongs to one checkpoint, and the 
assigned in the application belongs to one operator, they are different.
Best,Congxian

M Singh  于2019年11月21日周四 上午6:18写道:

 Hi Arvid:
Thanks for your clarification.
I am giving supplying uid for the stateful operators and find the following 
directory structure on in the chkpoint directory:

f4e78cb47f9dc12859558be7d15f39d0/chk-6/a4d87cda-2afd-47d4-8d3f-b0658466fb2d
The first part f4e78cb47f9dc12859558be7d15f39d0 is the job_idIs there a way to 
map the last part (uuid a4d87cda-2afd-47d4-8d3f-b0658466fb2d) -  to the uid 
assigned in the application ?

Thanks 
On Wednesday, November 20, 2019, 07:52:49 AM EST, Arvid Heise 
 wrote:  
 
 Hi Mans,
just to follow up. There are no limitations for name or uuid. 

The uuid will be in fact hashed internally while the StreamGraph is being 
generated, so all characters are allowed.The name is only for debugging 
purposes and web ui. If you use very special characters, you may see oddities 
in logs/web ui, but nothing should break. 
Spaces or parentheses should work in any case.

Best,
Arvid

On Sat, Nov 16, 2019 at 6:40 PM M Singh  wrote:

 Thanks Jiayi for your response. I am thinking on the same lines.  
Regarding using the same name and uuid, I believe the checkpoint state for an 
operator will be easy to identify if the uuid is the same as name.  But I am 
not sure if having a very long name and uuid or a character like parenthesis, 
etc might cause any issues, so just wanted to check.
Mans
On Saturday, November 16, 2019, 11:19:08 AM EST, Jiayi Liao 
 wrote:  
 
 
Hi Mans!




Firstly let’s see how operator’s name and uid is used. AFAIK, operator’s name 
is used in WebUI and metrics reporting, and uid is used to mark the uniqueness 
of operator which is useful when you’re using state[1].




> Are there any restrictions on the length of the name and uuid attributes?

It’s pretty much the same as you define a string value, so there is no special 
restrictions on this.




> Are there any restrictions on the characters used for name and uuid (blank 
>spaces, etc) ?

I’m not a hundred percent sure about this but I run a testing program and it 
works fine.




> Can the name and uuid be the same ? 

Yes. But uuids accross operators cannot be same.




For me I usually set name and uuid for almost every operator, which gives me 
better experience in monitoring and scaling.




Hope this helps.







[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#matching-operator-state







Best,



Jiayi Liao


At 2019-11-16 18:35:38, "M Singh"  wrote:
 
Hi:
I am working on a project and wanted to find out what are the best practices 
for setting name and uuid for operators:
1. Are there any restrictions on the 

Apache Flink - Uid and name for Flink sources and sinks

2019-11-21 Thread M Singh
Hi Folks:
I am assigning uid and name for all stateful processors in our application and 
wanted to find out the following:
1. Should we assign uid and name to the sources and sinks too ?  2. What are 
the pros and cons of adding uid to sources and sinks ?3. The sinks have uid and 
hashUid - which is the preferred attribute to use  for allowing job restarts 
?4. If sink and sources uid are not provided in the application, can they still 
maintain state across job restarts from checkpoints ?  5. Can the sinks and 
sources without uid restart from savepoints ?6. The data streams have an 
attribute id -  How is this generated and can this be used for creating a uid 
for the sink ?  
Thanks for your help.
Mans

Re: Apache Flink - Uid and name for Flink sources and sinks

2019-11-22 Thread M Singh
 
Hi Folks - Please let me know if you have any advice on the best practices for 
setting uid for sources and sinks.  Thanks.  MansOn Thursday, November 21, 
2019, 10:10:49 PM EST, M Singh  wrote:  
 
 Hi Folks:
I am assigning uid and name for all stateful processors in our application and 
wanted to find out the following:
1. Should we assign uid and name to the sources and sinks too ?  2. What are 
the pros and cons of adding uid to sources and sinks ?3. The sinks have uid and 
hashUid - which is the preferred attribute to use  for allowing job restarts 
?4. If sink and sources uid are not provided in the application, can they still 
maintain state across job restarts from checkpoints ?  5. Can the sinks and 
sources without uid restart from savepoints ?6. The data streams have an 
attribute id -  How is this generated and can this be used for creating a uid 
for the sink ?  
Thanks for your help.
Mans  

Apache Flink - Troubleshooting exception while starting the job from savepoint

2019-11-22 Thread M Singh
Hi:
I have a flink application in which some of the operators have uid and name and 
some stateless ones don't.
I've taken a save point and tried to start another instance of the application 
from a savepoint - I get the following exception which indicates that the 
operator is not available to the new program even though the second job is the 
same as first but just running from the first jobs savepoint.

 

Caused by: java.lang.IllegalStateException: Failed to rollback to 
checkpoint/savepoint 
s3://mybucket/state/savePoint/mysavepointfolder/66s4c6402d7532801287290436fa9fadd/savepoint-664c64-fa235d26d379.
 Cannot map checkpoint/savepoint state for operator 
d1a56c5a9ce5e3f1b03e01cac458bb4f to the new program, because the operator is 
not available in the new program. If you want to allow to skip this, you can 
set the --allowNonRestoredState option on the CLI.

 at 
org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:205)

 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1102)

 at 
org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1219)

 at 
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1143)

 at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:294)

 at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)

 ... 10 more




I've tried to start an application instance from the checkpoint too of the 
first instance but it gives the same exception indicating that the operator is 
not available.
Questions:
1. If this a problem because some of the operators don't have uid ?2. Is it 
required to have uids even for stateless operators like simple map or filter 
operators ?3. Is there a way to find out which operator is not available in the 
new application even though I am running the same application ?4. Is there a 
way to figure out if this is the only missing operator or are there others 
whose mapping is missing for the second instance run ?5. Is this issue resolved 
in Apache Flink 1.9 (since I am still using Flink 1.6)
If there any additional pointers please let me know.
Thanks
Mans



Re: Apache Flink - Troubleshooting exception while starting the job from savepoint

2019-11-23 Thread M Singh
 
Hey Folks:   
Please let me know how to resolve this issue since using 
--allowNonRestoredState without knowing if any state will be lost seems risky.
ThanksOn Friday, November 22, 2019, 02:55:09 PM EST, M Singh 
 wrote:  
 
 Hi:
I have a flink application in which some of the operators have uid and name and 
some stateless ones don't.
I've taken a save point and tried to start another instance of the application 
from a savepoint - I get the following exception which indicates that the 
operator is not available to the new program even though the second job is the 
same as first but just running from the first jobs savepoint.

 

Caused by: java.lang.IllegalStateException: Failed to rollback to 
checkpoint/savepoint 
s3://mybucket/state/savePoint/mysavepointfolder/66s4c6402d7532801287290436fa9fadd/savepoint-664c64-fa235d26d379.
 Cannot map checkpoint/savepoint state for operator 
d1a56c5a9ce5e3f1b03e01cac458bb4f to the new program, because the operator is 
not available in the new program. If you want to allow to skip this, you can 
set the --allowNonRestoredState option on the CLI.

 at 
org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:205)

 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1102)

 at 
org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1219)

 at 
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1143)

 at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:294)

 at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)

 ... 10 more




I've tried to start an application instance from the checkpoint too of the 
first instance but it gives the same exception indicating that the operator is 
not available.
Questions:
1. If this a problem because some of the operators don't have uid ?2. Is it 
required to have uids even for stateless operators like simple map or filter 
operators ?3. Is there a way to find out which operator is not available in the 
new application even though I am running the same application ?4. Is there a 
way to figure out if this is the only missing operator or are there others 
whose mapping is missing for the second instance run ?5. Is this issue resolved 
in Apache Flink 1.9 (since I am still using Flink 1.6)
If there any additional pointers please let me know.
Thanks
Mans

  

Re: Apache Flink - Uid and name for Flink sources and sinks

2019-11-24 Thread M Singh
 Thanks Dian for your answers.
A few more questions:
1. If I do not assign uids to operators/sources and sinks - I am assuming the 
framework assigns it one.  Now how does another run of the the same application 
using the previous runs savepoint/checkpoint match it's tasks/operators to the 
savepoint/checkpoint state of the application ? 2. Is the operatorID in the 
checkpoint state the same as uid ?  3. Do you have any pointer as to how an 
operatorID is generated for the checkpoint and who can it be mapped to back to 
the operator for troubleshooting purposes ?

Regarding id attribute - I meant the following:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L139
However, I realized that this is not unique across applications runs and so not 
a good candidate.
Thanks again for your help.





On Sunday, November 24, 2019, 04:55:55 AM EST, Dian Fu 
 wrote:  
 
 1. Should we assign uid and name to the sources and sinks too ?  
>> If the sources/sinks have used state, you should assign uid for them. This 
>> is usually true for sources. 

2. What are the pros and cons of adding uid to sources and sinks ?
>> I'm not seeing the cons for assigning uid to sources and sinks. So I guess 
>> assigning the uids for sources/sinks is always a good practice.

3. The sinks have uid and hashUid - which is the preferred attribute to use  
for allowing job restarts ?
>> Could you see if this could answer you question: 
>> https://stackoverflow.com/questions/46112142/apache-flink-set-operator-uid-vs-uidhash

4. If sink and sources uid are not provided in the application, can they still 
maintain state across job restarts from checkpoints ?>> It depends on whether 
the sources/sinks uses state. I think most sources use state to maintaining the 
read offset.  5. Can the sinks and sources without uid restart from savepoints ?
>> The same as above.

6. The data streams have an attribute id -  How is this generated and can this 
be used for creating a uid for the sink ?  
>> Not sure what do you mean by "attribute id". Could you give some more 
>> detailed information about it?

Regards,
Dian
On Fri, Nov 22, 2019 at 6:27 PM M Singh  wrote:

 
Hi Folks - Please let me know if you have any advice on the best practices for 
setting uid for sources and sinks.  Thanks.  MansOn Thursday, November 21, 
2019, 10:10:49 PM EST, M Singh  wrote:  
 
 Hi Folks:
I am assigning uid and name for all stateful processors in our application and 
wanted to find out the following:
1. Should we assign uid and name to the sources and sinks too ?  2. What are 
the pros and cons of adding uid to sources and sinks ?3. The sinks have uid and 
hashUid - which is the preferred attribute to use  for allowing job restarts 
?4. If sink and sources uid are not provided in the application, can they still 
maintain state across job restarts from checkpoints ?  5. Can the sinks and 
sources without uid restart from savepoints ?6. The data streams have an 
attribute id -  How is this generated and can this be used for creating a uid 
for the sink ?  
Thanks for your help.
Mans  
  

Apache Flink - Throttling stream flow

2019-11-24 Thread M Singh
Hi:
I have an Flink streaming application that invokes  some other web services.  
However the webservices have limited throughput.  So I wanted to find out if 
there any recommendations on how to throttle the Flink datastream so that they 
don't overload the downstrream services.  I am using Kinesis as source and sink 
in my application.
Please let me know if there any hooks available in Flink, what are the patterns 
that can be used and what are the best practices/pitfalls for using them.
Thanks 
Mans

Re: Apache Flink - Throttling stream flow

2019-11-25 Thread M Singh
 Thanks Ciazhi & Thomas for your responses.
I read the throttling example but want to see if that work with a distributed 
broker like Kinesis and how to have throttling feedback to the Kinesis source 
so that it can vary the rate without interfering with watermarks, etc.
Thanks again 
Mans

On Monday, November 25, 2019, 05:55:21 AM EST, Thomas Julian 
 wrote:  
 
 related

https://issues.apache.org/jira/browse/FLINK-13792

Regards,
Julian.


 On Mon, 25 Nov 2019 15:25:14 +0530 Caizhi Weng  
wrote 


Hi,

As far as I know, Flink currently doesn't have a built-in throttling function. 
You can write your own user-defined function to achieve this. Your function 
just gives out what it reads in and limits the speed it gives out records at 
the same time.

If you're not familiar with user-defined functions, see 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html

Here is a throttling iterator example: 
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/utils/ThrottledIterator.java

M Singh  于2019年11月25日周一 上午5:50写道:


Hi:

I have an Flink streaming application that invokes  some other web services.  
However the webservices have limited throughput.  So I wanted to find out if 
there any recommendations on how to throttle the Flink datastream so that they 
don't overload the downstrream services.  I am using Kinesis as source and sink 
in my application.

Please let me know if there any hooks available in Flink, what are the patterns 
that can be used and what are the best practices/pitfalls for using them.

Thanks 
Mans




  

Re: Apache Flink - Uid and name for Flink sources and sinks

2019-11-25 Thread M Singh
 
Thanks DIan for your pointers.  MansOn Sunday, November 24, 2019, 08:57:53 
PM EST, Dian Fu  wrote:  
 
 Hi Mans,
Please see my reply inline below.


在 2019年11月25日,上午5:42,M Singh  写道:
 Thanks Dian for your answers.
A few more questions:
1. If I do not assign uids to operators/sources and sinks - I am assuming the 
framework assigns it one.  Now how does another run of the the same application 
using the previous runs savepoint/checkpoint match it's tasks/operators to the 
savepoint/checkpoint state of the application ? 

You are right that the framework will generate an uid for an operator if it's 
not assigned. The uid is generated in a deterministic way to ensure that the 
uid for the same operator remains the same as previous runs(under certain 
conditions). 
The uid generation 
algorithm:https://github.com/apache/flink/blob/fd511c345eac31f03b801ff19dbf1f8c86aae760/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java#L78


2. Is the operatorID in the checkpoint state the same as uid ?  

3. Do you have any pointer as to how an operatorID is generated for the 
checkpoint and who can it be mapped to back to the operator for troubleshooting 
purposes ?

The OperatorID is constructed from the uid and they are the 
same:https://github.com/apache/flink/blob/66b979efc7786edb1a207339b8670d0e82c459a7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L292


Regarding id attribute - I meant the following:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L139


However, I realized that this is not unique across applications runs and so not 
a good candidate.
Thanks again for your help.





On Sunday, November 24, 2019, 04:55:55 AM EST, Dian Fu 
 wrote:  
 
 1. Should we assign uid and name to the sources and sinks too ?  
>> If the sources/sinks have used state, you should assign uid for them. This 
>> is usually true for sources. 

2. What are the pros and cons of adding uid to sources and sinks ?
>> I'm not seeing the cons for assigning uid to sources and sinks. So I guess 
>> assigning the uids for sources/sinks is always a good practice.

3. The sinks have uid and hashUid - which is the preferred attribute to use  
for allowing job restarts ?
>> Could you see if this could answer you question: 
>> https://stackoverflow.com/questions/46112142/apache-flink-set-operator-uid-vs-uidhash

4. If sink and sources uid are not provided in the application, can they still 
maintain state across job restarts from checkpoints ?>> It depends on whether 
the sources/sinks uses state. I think most sources use state to maintaining the 
read offset.  5. Can the sinks and sources without uid restart from savepoints ?
>> The same as above.

6. The data streams have an attribute id -  How is this generated and can this 
be used for creating a uid for the sink ?  
>> Not sure what do you mean by "attribute id". Could you give some more 
>> detailed information about it?

Regards,
Dian
On Fri, Nov 22, 2019 at 6:27 PM M Singh  wrote:

 
Hi Folks - Please let me know if you have any advice on the best practices for 
setting uid for sources and sinks.  Thanks.  MansOn Thursday, November 21, 
2019, 10:10:49 PM EST, M Singh  wrote:  
 
 Hi Folks:
I am assigning uid and name for all stateful processors in our application and 
wanted to find out the following:
1. Should we assign uid and name to the sources and sinks too ?  2. What are 
the pros and cons of adding uid to sources and sinks ?3. The sinks have uid and 
hashUid - which is the preferred attribute to use  for allowing job restarts 
?4. If sink and sources uid are not provided in the application, can they still 
maintain state across job restarts from checkpoints ?  5. Can the sinks and 
sources without uid restart from savepoints ?6. The data streams have an 
attribute id -  How is this generated and can this be used for creating a uid 
for the sink ?  
Thanks for your help.
Mans  
  

  

Re: Apache Flink - Troubleshooting exception while starting the job from savepoint

2019-11-25 Thread M Singh
 Hi Kostas/Congxian:
Thanks fo your response.  
Based on your feedback, I found that I had missed adding uid to one of the 
stateful operators and correcting that resolved the issue.  I still have 
stateless operators which I have no uid specified in the application.
So, I thought that adding uid was optional and if we don't add it and run 
another instance of the same app from a savepoint or checkpoint, it will pick 
up the state based on the generated uid.  Is that a correct understanding ?  
Also, if some stateful operators have uids but some don't, will it pick up the 
state for the operators with uid and the non-uid (using the generated uid) ones 
provided the application has not changed ?
Thanks again for your response.
Mans
On Monday, November 25, 2019, 09:24:42 AM EST, Congxian Qiu 
 wrote:  
 
 Hi
The problem is that the specified uid did not in the new job.1. As far as I 
know, the answer is yes. There are some operators have their own state(such as 
window state), could you please share the minimal code of your job?2.*truely* 
stateless operator do not need to have uid, but for the reason described in the 
above, assign uid to all operators is recommended.3. if the previous job is 
still there, I'm not sure we can find the operatorId in the UI easily, maybe 
other people can answer the question.4. for this purpose, maybe you can debug 
the savepoint meta with the new job locally, maybe 
CheckpointMetadataLoadingTest can help.5. for this problem, 1.9 is same as 1.6

Best,Congxian

Kostas Kloudas  于2019年11月25日周一 下午9:42写道:

As a side note, I am assuming that you are using the same Flink Job
before and after the savepoint and the same Flink version.
Am I correct?

Cheers,
Kostas

On Mon, Nov 25, 2019 at 2:40 PM Kostas Kloudas  wrote:
>
> Hi Singh,
>
> This behaviour is strange.
> One thing I can recommend to see if the two jobs are identical is to
> launch also the second job without a savepoint,
> just start from scratch, and simply look at the web interface to see
> if everything is there.
>
> Also could you please provide some code from your job, just to see if
> there is anything problematic with the application code?
> Normally there should be no problem with not providing UIDs for some
> stateless operators.
>
> Cheers,
> Kostas
>
> On Sat, Nov 23, 2019 at 11:16 AM M Singh  wrote:
> >
> >
> > Hey Folks:
> >
> > Please let me know how to resolve this issue since using 
> > --allowNonRestoredState without knowing if any state will be lost seems 
> > risky.
> >
> > Thanks
> > On Friday, November 22, 2019, 02:55:09 PM EST, M Singh 
> >  wrote:
> >
> >
> > Hi:
> >
> > I have a flink application in which some of the operators have uid and name 
> > and some stateless ones don't.
> >
> > I've taken a save point and tried to start another instance of the 
> > application from a savepoint - I get the following exception which 
> > indicates that the operator is not available to the new program even though 
> > the second job is the same as first but just running from the first jobs 
> > savepoint.
> >
> > Caused by: java.lang.IllegalStateException: Failed to rollback to 
> > checkpoint/savepoint 
> > s3://mybucket/state/savePoint/mysavepointfolder/66s4c6402d7532801287290436fa9fadd/savepoint-664c64-fa235d26d379.
> >  Cannot map checkpoint/savepoint state for operator 
> > d1a56c5a9ce5e3f1b03e01cac458bb4f to the new program, because the operator 
> > is not available in the new program. If you want to allow to skip this, you 
> > can set the --allowNonRestoredState option on the CLI.
> >
> > at 
> > org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:205)
> >
> > at 
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1102)
> >
> > at 
> > org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1219)
> >
> > at 
> > org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1143)
> >
> > at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:294)
> >
> > at 
> > org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
> >
> > ... 10 more
> >
> >
> >
> > I've tried to start an application instance from the checkpoint too of the 
> > first instance but it gives the same exception indicating that the operator 
> > is not available.
> >
> > Questions:
> >
> > 1. If this a problem because some of the operators don't have uid ?
> > 2. Is it required to have uids 

Re: Apache Flink - Troubleshooting exception while starting the job from savepoint

2019-11-29 Thread M Singh
 Thanks Congxian for your references.  Mans
On Wednesday, November 27, 2019, 07:12:57 AM EST, Congxian Qiu 
 wrote:  
 
 Hi,
As the doc[1] said we should assign uid to all the stateful operators. If you 
do not set uid for an operator, Flink will generate an operatorId for it, 
AFAIK, operatorId will not change as far as the job DAG does not change.
you can skip the operator's state which is not in the new job, please ref to 
doc[2], and theses operators will start from scratch.
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/savepoints.html#should-i-assign-ids-to-all-operators-in-my-job[2]
 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/savepoints.html#what-happens-if-i-delete-an-operator-that-has-state-from-my-job
Best,Congxian

M Singh  于2019年11月26日周二 上午10:49写道:

 Hi Kostas/Congxian:
Thanks fo your response.  
Based on your feedback, I found that I had missed adding uid to one of the 
stateful operators and correcting that resolved the issue.  I still have 
stateless operators which I have no uid specified in the application.
So, I thought that adding uid was optional and if we don't add it and run 
another instance of the same app from a savepoint or checkpoint, it will pick 
up the state based on the generated uid.  Is that a correct understanding ?  
Also, if some stateful operators have uids but some don't, will it pick up the 
state for the operators with uid and the non-uid (using the generated uid) ones 
provided the application has not changed ?
Thanks again for your response.
Mans
On Monday, November 25, 2019, 09:24:42 AM EST, Congxian Qiu 
 wrote:  
 
 Hi
The problem is that the specified uid did not in the new job.1. As far as I 
know, the answer is yes. There are some operators have their own state(such as 
window state), could you please share the minimal code of your job?2.*truely* 
stateless operator do not need to have uid, but for the reason described in the 
above, assign uid to all operators is recommended.3. if the previous job is 
still there, I'm not sure we can find the operatorId in the UI easily, maybe 
other people can answer the question.4. for this purpose, maybe you can debug 
the savepoint meta with the new job locally, maybe 
CheckpointMetadataLoadingTest can help.5. for this problem, 1.9 is same as 1.6

Best,Congxian

Kostas Kloudas  于2019年11月25日周一 下午9:42写道:

As a side note, I am assuming that you are using the same Flink Job
before and after the savepoint and the same Flink version.
Am I correct?

Cheers,
Kostas

On Mon, Nov 25, 2019 at 2:40 PM Kostas Kloudas  wrote:
>
> Hi Singh,
>
> This behaviour is strange.
> One thing I can recommend to see if the two jobs are identical is to
> launch also the second job without a savepoint,
> just start from scratch, and simply look at the web interface to see
> if everything is there.
>
> Also could you please provide some code from your job, just to see if
> there is anything problematic with the application code?
> Normally there should be no problem with not providing UIDs for some
> stateless operators.
>
> Cheers,
> Kostas
>
> On Sat, Nov 23, 2019 at 11:16 AM M Singh  wrote:
> >
> >
> > Hey Folks:
> >
> > Please let me know how to resolve this issue since using 
> > --allowNonRestoredState without knowing if any state will be lost seems 
> > risky.
> >
> > Thanks
> > On Friday, November 22, 2019, 02:55:09 PM EST, M Singh 
> >  wrote:
> >
> >
> > Hi:
> >
> > I have a flink application in which some of the operators have uid and name 
> > and some stateless ones don't.
> >
> > I've taken a save point and tried to start another instance of the 
> > application from a savepoint - I get the following exception which 
> > indicates that the operator is not available to the new program even though 
> > the second job is the same as first but just running from the first jobs 
> > savepoint.
> >
> > Caused by: java.lang.IllegalStateException: Failed to rollback to 
> > checkpoint/savepoint 
> > s3://mybucket/state/savePoint/mysavepointfolder/66s4c6402d7532801287290436fa9fadd/savepoint-664c64-fa235d26d379.
> >  Cannot map checkpoint/savepoint state for operator 
> > d1a56c5a9ce5e3f1b03e01cac458bb4f to the new program, because the operator 
> > is not available in the new program. If you want to allow to skip this, you 
> > can set the --allowNonRestoredState option on the CLI.
> >
> > at 
> > org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:205)
> >
> > at 
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1102)
> >
>

Re: Apache Flink - Throttling stream flow

2019-11-29 Thread M Singh
 
Thanks Rong for your references. 
>From what I can see, the rate limiter is initialized statically.  But if the 
>load on downstream services varies, is there a way to update the rater limiter 
>at runtime ?  Please let me know if I missed anything.
Thanks again for your advice.On Wednesday, November 27, 2019, 11:32:06 AM 
EST, Rong Rong  wrote:  
 
 Hi Mans,
is this what you are looking for [1][2]?
--Rong
[1] https://issues.apache.org/jira/browse/FLINK-11501[2] 
https://github.com/apache/flink/pull/7679
On Mon, Nov 25, 2019 at 3:29 AM M Singh  wrote:

 Thanks Ciazhi & Thomas for your responses.
I read the throttling example but want to see if that work with a distributed 
broker like Kinesis and how to have throttling feedback to the Kinesis source 
so that it can vary the rate without interfering with watermarks, etc.
Thanks again 
Mans

On Monday, November 25, 2019, 05:55:21 AM EST, Thomas Julian 
 wrote:  
 
 related

https://issues.apache.org/jira/browse/FLINK-13792

Regards,
Julian.


 On Mon, 25 Nov 2019 15:25:14 +0530 Caizhi Weng  
wrote 


Hi,

As far as I know, Flink currently doesn't have a built-in throttling function. 
You can write your own user-defined function to achieve this. Your function 
just gives out what it reads in and limits the speed it gives out records at 
the same time.

If you're not familiar with user-defined functions, see 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html

Here is a throttling iterator example: 
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/utils/ThrottledIterator.java

M Singh  于2019年11月25日周一 上午5:50写道:


Hi:

I have an Flink streaming application that invokes  some other web services.  
However the webservices have limited throughput.  So I wanted to find out if 
there any recommendations on how to throttle the Flink datastream so that they 
don't overload the downstrream services.  I am using Kinesis as source and sink 
in my application.

Please let me know if there any hooks available in Flink, what are the patterns 
that can be used and what are the best practices/pitfalls for using them.

Thanks 
Mans




  
  

Side output question

2019-12-02 Thread M Singh
Hi:
I am replacing SplitOperator in my flink application with a simple processor 
with side outputs.

My questions is that does the main stream from which we get the side outputs 
need to have any events (ie, produced using by the using collector.collect) ?  
Or can we have all the output as side outputs ? Also are there any pros and 
cons of at least one main collected output vs all side outputs ?
Thanks
Mans

Apache Flink - Retries for async processing

2019-12-09 Thread M Singh
Hi Folks:
I am working on a project where I will be using Flink's async processing 
capabilities.  The job has to make http request using a token.  The token 
expires periodically and needs to be refreshed.
So, I was looking for patterns for handling async call failures and retries 
when the token expires.  I found this link Re: Backoff strategies for async IO 
functions? and it appears that Flink does not support retries and periodically 
refresh a security token.  I am using 1.6 at the moment but am planning to 
migrate to 1.9 soon.

| 
| 
|  | 
Re: Backoff strategies for async IO functions?


 |

 |

 |

 
If there are any patterns on how to deal with this scenario, please let me know.
Thanks
Mans


Re: Side output question

2019-12-10 Thread M Singh
 Thanks Arvid for your answer.
Can you please point me to any documentation/reference as to which metrics 
might be impacted ? Also, let me know of any other pitfall.
Once again, I appreciate your help.
On Tuesday, December 10, 2019, 03:23:01 AM EST, Arvid Heise 
 wrote:  
 
 Hi Mans,
there should be no issue to only have side-outputs in your operator. There 
should also be no big drawbacks. I guess mostly some metrics will not be 
properly populated, but you can always populate them manually or add new ones.
Best,
Arvid

On Mon, Dec 2, 2019 at 8:40 PM M Singh  wrote:

Hi:
I am replacing SplitOperator in my flink application with a simple processor 
with side outputs.

My questions is that does the main stream from which we get the side outputs 
need to have any events (ie, produced using by the using collector.collect) ?  
Or can we have all the output as side outputs ? Also are there any pros and 
cons of at least one main collected output vs all side outputs ?
Thanks
Mans
  

Re: Apache Flink - Retries for async processing

2019-12-10 Thread M Singh
 Thanks Jingsong for sharing your solution.
Since both refreshing the token and the actual API request can fail with either 
recoverable and unrecoverable exceptions, are there any patterns for retrying 
both and making the code robust to failures.
Thanks again.
On Monday, December 9, 2019, 10:08:39 PM EST, Jingsong Li 
 wrote:  
 
 Hi M Singh,
Our internal has this scenario too, as far as I know, Flink does not have this 
internal mechanism in 1.9 too.I can share my solution:- In async function, 
start a thread factory.- Send the call to thread factory when this call has 
failed. Do refresh security token too.Actually, deal with anything in function. 
As long as we finally call the relevant methods of ResultFuture.
Best,Jingsong Lee
On Tue, Dec 10, 2019 at 3:25 AM M Singh  wrote:

Hi Folks:
I am working on a project where I will be using Flink's async processing 
capabilities.  The job has to make http request using a token.  The token 
expires periodically and needs to be refreshed.
So, I was looking for patterns for handling async call failures and retries 
when the token expires.  I found this link Re: Backoff strategies for async IO 
functions? and it appears that Flink does not support retries and periodically 
refresh a security token.  I am using 1.6 at the moment but am planning to 
migrate to 1.9 soon.

| 
| 
|  | 
Re: Backoff strategies for async IO functions?


 |

 |

 |

 
If there are any patterns on how to deal with this scenario, please let me know.
Thanks
Mans



-- 
Best, Jingsong Lee  

Apache Flink - Clarifications about late side output

2019-12-10 Thread M Singh
Hi:
I have a few questions about the side output late data.  
Here is the API
stream
   .keyBy(...)   <-  keyed versus non-keyed windows
   .window(...)  <-  required: "assigner"
  [.trigger(...)]<-  optional: "trigger" (else default trigger)
  [.evictor(...)]<-  optional: "evictor" (else no evictor)
  [.allowedLateness(...)]<-  optional: "lateness" (else zero)
  [.sideOutputLateData(...)] <-  optional: "output tag" (else no side 
output for late data)
   .reduce/aggregate/fold/apply()  <-  required: "function"
  [.getSideOutput(...)]  <-  optional: "output tag"

Apache Flink 1.9 Documentation: Windows

| 
| 
|  | 
Apache Flink 1.9 Documentation: Windows


 |

 |

 |



Here is the documentation:
Late elements considerations

When specifying an allowed lateness greater than 0, the window along with its 
content is kept after the watermark passes the end of the window. In these 
cases, when a late but not dropped element arrives, it could trigger another 
firing for the window. These firings are called late firings, as they are 
triggered by late events and in contrast to the main firing which is the first 
firing of the window. In case of session windows, late firings can further lead 
to merging of windows, as they may “bridge” the gap between two pre-existing, 
unmerged windows.

Attention You should be aware that the elements emitted by a late firing should 
be treated as updated results of a previous computation, i.e., your data stream 
will contain multiple results for the same computation. Depending on your 
application, you need to take these duplicated results into account or 
deduplicate them.

Questions:
1. If we have allowed lateness to be greater than 0 (say 5), then if an event 
which arrives at window end + 3 (within allowed lateness),     (a) it is 
considered late and  included in the window function as a late firing ?      
(b) Are the late firings under the control of the trigger ?      (c) If there 
are may events like this - are there multiple window function invocations ?     
(d) Are these events (still within window end + allowed lateness) also emitted 
via the side output late data ?2. If an event arrives after the window end + 
allowed lateness -     (a) Is it excluded from the window function but still 
emitted from the side output late data ?      (b) And if it is emitted is there 
any attribute which indicates for which window it was a late event ?      (c) 
Is there any time limit while the late side output remains active for a 
particular window or all late events channeled to it ?
Thanks
Thanks
Mans






Re: Apache Flink - Clarifications about late side output

2019-12-11 Thread M Singh
 Thanks Timo for your answer.  I will try the prototype but was wondering if I 
can find some theoretical documentation to give me a sound understanding.
Mans
On Wednesday, December 11, 2019, 05:44:15 AM EST, Timo Walther 
 wrote:  
 
 Little mistake: The key must be any constant instead of `e`.


On 11.12.19 11:42, Timo Walther wrote:
> Hi Mans,
> 
> I would recommend to create a little prototype to answer most of your 
> questions in action.
> 
> You can simple do:
> 
> stream = env.fromElements(1L, 2L, 3L, 4L)
>     .assignTimestampsAndWatermarks(
>     new AssignerWithPunctuatedWatermarks{
>     extractTimestamp(e) = e,
>     checkAndGetNextWatermark(e, ts) = new Watermark(e)
>     })
> 
> stream.keyBy(e -> e).window(...).print()
> env.execute()
> 
> This allows to quickly create a stream of event time for testing the 
> semantics.
> 
> I hope this helps. Otherwise of course we can help you in finding the 
> answers to the remaining questions.
> 
> Regards,
> Timo
> 
> 
> 
> On 10.12.19 20:32, M Singh wrote:
>> Hi:
>>
>> I have a few questions about the side output late data.
>>
>> Here is the API
>>
>> |stream .keyBy(...) <- keyed versus non-keyed windows .window(...) <- 
>> required: "assigner" [.trigger(...)] <- optional: "trigger" (else 
>> default trigger) [.evictor(...)] <- optional: "evictor" (else no 
>> evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero) 
>> [.sideOutputLateData(...)] <- optional: "output tag" (else no side 
>> output for late data) .reduce/aggregate/fold/apply() <- required: 
>> "function" [.getSideOutput(...)] <- optional: "output tag"|
>>
>>
>>
>> Apache Flink 1.9 Documentation: Windows 
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#late-elements-considerations>
>>  
>>
>>
>>
>>
>>
>>     Apache Flink 1.9 Documentation: Windows
>>
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#late-elements-considerations>
>>  
>>
>>
>>
>> Here is the documentation:
>>
>>
>>   Late elements
>>      
>> considerations<https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#late-elements-considerations>
>>  
>>
>>
>> When specifying an allowed lateness greater than 0, the window along 
>> with its content is kept after the watermark passes the end of the 
>> window. In these cases, when a late but not dropped element arrives, 
>> it could trigger another firing for the window. These firings are 
>> called |late firings|, as they are triggered by late events and in 
>> contrast to the |main firing| which is the first firing of the window. 
>> In case of session windows, late firings can further lead to merging 
>> of windows, as they may “bridge” the gap between two pre-existing, 
>> unmerged windows.
>>
>> Attention You should be aware that the elements emitted by a late 
>> firing should be treated as updated results of a previous computation, 
>> i.e., your data stream will contain multiple results for the same 
>> computation. Depending on your application, you need to take these 
>> duplicated results into account or deduplicate them.
>>
>>
>> Questions:
>>
>> 1. If we have allowed lateness to be greater than 0 (say 5), then if 
>> an event which arrives at window end + 3 (within allowed lateness),
>>      (a) it is considered late and  included in the window function as 
>> a late firing ?
>>      (b) Are the late firings under the control of the trigger ?
>>      (c) If there are may events like this - are there multiple window 
>> function invocations ?
>>      (d) Are these events (still within window end + allowed lateness) 
>> also emitted via the side output late data ?
>> 2. If an event arrives after the window end + allowed lateness -
>>      (a) Is it excluded from the window function but still emitted 
>> from the side output late data ?
>>      (b) And if it is emitted is there any attribute which indicates 
>> for which window it was a late event ?
>>      (c) Is there any time limit while the late side output remains 
>> active for a particular window or all late events channeled to it ?
>>
>> Thanks
>>
>> Thanks
>>
>> Mans
>>
>>
>>
>>
>>

  

Re: Apache Flink - Retries for async processing

2019-12-11 Thread M Singh
 Thanks Zhu for your advice.  Mans
On Tuesday, December 10, 2019, 09:32:01 PM EST, Zhu Zhu  
wrote:  
 
 Hi M Singh,
I think you would be able to know the request failure cause and whether it is 
recoverable or not.You can handle the error as you like. For example, if you 
think the error is unrecoverable, you can complete the ResultFuture 
exceptionally to expose this failure to Flink framework. If the error is 
recoverable, you can just retry (or refresh the token), and only complete the 
ResultFuture until it succeeds (until timeout).
Thanks,Zhu Zhu
M Singh  于2019年12月10日周二 下午8:51写道:

 Thanks Jingsong for sharing your solution.
Since both refreshing the token and the actual API request can fail with either 
recoverable and unrecoverable exceptions, are there any patterns for retrying 
both and making the code robust to failures.
Thanks again.
On Monday, December 9, 2019, 10:08:39 PM EST, Jingsong Li 
 wrote:  
 
 Hi M Singh,
Our internal has this scenario too, as far as I know, Flink does not have this 
internal mechanism in 1.9 too.I can share my solution:- In async function, 
start a thread factory.- Send the call to thread factory when this call has 
failed. Do refresh security token too.Actually, deal with anything in function. 
As long as we finally call the relevant methods of ResultFuture.
Best,Jingsong Lee
On Tue, Dec 10, 2019 at 3:25 AM M Singh  wrote:

Hi Folks:
I am working on a project where I will be using Flink's async processing 
capabilities.  The job has to make http request using a token.  The token 
expires periodically and needs to be refreshed.
So, I was looking for patterns for handling async call failures and retries 
when the token expires.  I found this link Re: Backoff strategies for async IO 
functions? and it appears that Flink does not support retries and periodically 
refresh a security token.  I am using 1.6 at the moment but am planning to 
migrate to 1.9 soon.

| 
| 
|  | 
Re: Backoff strategies for async IO functions?


 |

 |

 |

 
If there are any patterns on how to deal with this scenario, please let me know.
Thanks
Mans



-- 
Best, Jingsong Lee  
  

Re: Apache Flink - Clarifications about late side output

2019-12-11 Thread M Singh
 Thanks David for your detailed answers.   Mans
On Wednesday, December 11, 2019, 08:12:51 AM EST, David Anderson 
 wrote:  
 
 
If we have allowed lateness to be greater than 0 (say 5), then if an event 
which arrives at window end + 3 (within allowed lateness), 

    (a) it is considered late and included in the window function as a late 
firing ?
An event with a timestamp that falls within the window's boundaries that 
arrives when the current watermark is at window end + 3 will be included as a 
late event that has arrived within the allowed lateness.

Actually, I'm not sure I got this right -- on this point I recommend some 
experimentation, or careful reading of the code.
On Wed, Dec 11, 2019 at 2:08 PM David Anderson  wrote:

I'll attempt to answer your questions.

If we have allowed lateness to be greater than 0 (say 5), then if an event 
which arrives at window end + 3 (within allowed lateness), 
    (a) it is considered late and included in the window function as a late 
firing ?


An event with a timestamp that falls within the window's boundaries that 
arrives when the current watermark is at window end + 3 will be included as a 
late event that has arrived within the allowed lateness. 
    (b) Are the late firings under the control of the trigger ? 


Yes, the trigger is involved in all firings, late or not. 
    (c) If there are may events like this - are there multiple window function 
invocations ?

With the default event time trigger, each late event causes a late firing. You 
could use a custom trigger to implement other behaviors. 
    (d) Are these events (still within window end + allowed lateness) also 
emitted via the side output late data ?


No. The side output for late events is only used to collect events that fall 
outside the allowed lateness. 
2. If an event arrives after the window end + allowed lateness - 
    (a) Is it excluded from the window function but still emitted from the side 
output late data ?  


Yes. 
    (b) And if it is emitted is there any attribute which indicates for which 
window it was a late event ?  


No, the event is emitted without any additional information.

    (c) Is there any time limit while the late side output remains active for a 
particular window or all late events channeled to it ?

There is no time limit; the late side output remains operative indefinitely. 
Hope that helps,David
On Wed, Dec 11, 2019 at 1:40 PM M Singh  wrote:

 Thanks Timo for your answer.  I will try the prototype but was wondering if I 
can find some theoretical documentation to give me a sound understanding.
Mans
On Wednesday, December 11, 2019, 05:44:15 AM EST, Timo Walther 
 wrote:  
 
 Little mistake: The key must be any constant instead of `e`.


On 11.12.19 11:42, Timo Walther wrote:
> Hi Mans,
> 
> I would recommend to create a little prototype to answer most of your 
> questions in action.
> 
> You can simple do:
> 
> stream = env.fromElements(1L, 2L, 3L, 4L)
>     .assignTimestampsAndWatermarks(
>     new AssignerWithPunctuatedWatermarks{
>     extractTimestamp(e) = e,
>     checkAndGetNextWatermark(e, ts) = new Watermark(e)
>     })
> 
> stream.keyBy(e -> e).window(...).print()
> env.execute()
> 
> This allows to quickly create a stream of event time for testing the 
> semantics.
> 
> I hope this helps. Otherwise of course we can help you in finding the 
> answers to the remaining questions.
> 
> Regards,
> Timo
> 
> 
> 
> On 10.12.19 20:32, M Singh wrote:
>> Hi:
>>
>> I have a few questions about the side output late data.
>>
>> Here is the API
>>
>> |stream .keyBy(...) <- keyed versus non-keyed windows .window(...) <- 
>> required: "assigner" [.trigger(...)] <- optional: "trigger" (else 
>> default trigger) [.evictor(...)] <- optional: "evictor" (else no 
>> evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero) 
>> [.sideOutputLateData(...)] <- optional: "output tag" (else no side 
>> output for late data) .reduce/aggregate/fold/apply() <- required: 
>> "function" [.getSideOutput(...)] <- optional: "output tag"|
>>
>>
>>
>> Apache Flink 1.9 Documentation: Windows 
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#late-elements-considerations>
>>  
>>
>>
>>
>>
>>
>>     Apache Flink 1.9 Documentation: Windows
>>
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#late-elements-considerations>
>>  
>>
>>
>>
>> Here is the documentation:
>>
>>
>>   Late elements
>>      
>> considerations

Apache Flink - Flink Metrics - How to distinguish b/w metrics for two job manager on the same host

2019-12-18 Thread M Singh
Hi:
I am using AWS EMR with Flink application and two of the job managers are 
running on the same host.  I am looking at the metrics documentation (Apache 
Flink 1.9 Documentation: Metrics) and and see the following: 

| 
| 
|  | 
Apache Flink 1.9 Documentation: Metrics


 |

 |

 |

   
   - metrics.scope.jm  
  - Default: .jobmanager
  - Applied to all metrics that were scoped to a job manager.
  - 

...
List of all Variables
   
   - JobManager: 
   - TaskManager: , 
   - Job: , 
   - Task: , , , , 

   - Operator: ,, 


My question is there a way to distinguish b/w the two job managers ? I see only 
the  variable for JobManager and since the two are running on the same 
host, the value is the same.  Is there any other variable that I can use to 
distinguish the two.

For taskmanager I have taskmanager id but am not sure about the job manager.
Thanks
Mans


Re: Apache Flink - Flink Metrics - How to distinguish b/w metrics for two job manager on the same host

2019-12-19 Thread M Singh
 Thanks Vino and Biao for your help.  Mans
On Thursday, December 19, 2019, 02:25:40 AM EST, Biao Liu 
 wrote:  
 
 Hi Mans,
That's indeed a problem. We have a plan to fix it. I think it could be included 
in 1.11. You could follow this issue [1] to check the progress. 
[1] https://issues.apache.org/jira/browse/FLINK-9543

Thanks,Biao /'bɪ.aʊ/


On Thu, 19 Dec 2019 at 14:51, vino yang  wrote:

Hi Mans,
IMO, one job manager represents one Flink cluster and one Flink cluster has a 
suite of Flink configuration e.g. metrics reporter.
Some metrics reporters support tag feature, you can specify it to distinguish 
different Flink cluster.[1]
[1]: 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#datadog-orgapacheflinkmetricsdatadogdatadoghttpreporter
Best,Vino
M Singh  于2019年12月19日周四 上午2:54写道:

Hi:
I am using AWS EMR with Flink application and two of the job managers are 
running on the same host.  I am looking at the metrics documentation (Apache 
Flink 1.9 Documentation: Metrics) and and see the following: 

| 
| 
|  | 
Apache Flink 1.9 Documentation: Metrics


 |

 |

 |

   
   - metrics.scope.jm  
  - Default: .jobmanager
  - Applied to all metrics that were scoped to a job manager.
  - 

...
List of all Variables
   
   - JobManager: 
   - TaskManager: , 
   - Job: , 
   - Task: , , , , 

   - Operator: ,, 


My question is there a way to distinguish b/w the two job managers ? I see only 
the  variable for JobManager and since the two are running on the same 
host, the value is the same.  Is there any other variable that I can use to 
distinguish the two.

For taskmanager I have taskmanager id but am not sure about the job manager.
Thanks
Mans


  

Apache Flink - Flink Metrics collection using Prometheus on EMR from streaming mode

2019-12-24 Thread M Singh
Hi:
I wanted to find out what's the best way of collecting Flink metrics using 
Prometheus in a streaming application on EMR/Hadoop.
Since the Flink streaming jobs could be running on any node - is there any 
Prometheus configuration or service discovery option available that will 
dynamically pick up the metrics from the Filnk job and task managers running in 
cluster ?  
I believe for a batch job I can configure flink config to use Prometheus 
gateway configuration but I think this is not recommended for a streaming job.
Please let me know if you have any advice.
Thanks
Mans

Re: Apache Flink - Flink Metrics collection using Prometheus on EMR from streaming mode

2019-12-25 Thread M Singh
 Thanks Vino and Rafi for your references.
Regarding push gateway recommendations for batch - I am following this 
reference (https://prometheus.io/docs/practices/pushing/).
The scenario that I have is that we start Flink Apps on EMR whenever we need 
them. Sometimes the task manager gets killed and then restarted on another 
node.  In order to keep up with registering new task/job managers and 
de-registering the stopped/removed ones, I wanted to see if there is any 
service discovery integration with Flink apps.  
Thanks again for your help and let me know if you have any additional pointers.
On Wednesday, December 25, 2019, 03:39:31 AM EST, Rafi Aroch 
 wrote:  
 
 Hi,
Take a look here: https://github.com/eastcirclek/flink-service-discovery
I used it successfully quite a while ago, so things might have changed since.
Thanks, Rafi 
On Wed, Dec 25, 2019, 05:54 vino yang  wrote:

Hi Mans,
IMO, the mechanism of metrics reporter does not depend on any deployment mode.
>> is there any Prometheus configuration or service discovery option available 
>>that will dynamically pick up the metrics from the Filnk job and task 
>>managers running in cluster ?
Can you share more information about your scene?
>> I believe for a batch job I can configure flink config to use Prometheus 
>>gateway configuration but I think this is not recommended for a streaming job.
What does this mean? Why the Prometheus gateway configuration for Flink batch 
job is not recommended for a streaming job?
Best,Vino
M Singh  于2019年12月24日周二 下午4:02写道:

Hi:
I wanted to find out what's the best way of collecting Flink metrics using 
Prometheus in a streaming application on EMR/Hadoop.
Since the Flink streaming jobs could be running on any node - is there any 
Prometheus configuration or service discovery option available that will 
dynamically pick up the metrics from the Filnk job and task managers running in 
cluster ?  
I believe for a batch job I can configure flink config to use Prometheus 
gateway configuration but I think this is not recommended for a streaming job.
Please let me know if you have any advice.
Thanks
Mans

  

Apache Flink - Sharing state in processors

2020-01-10 Thread M Singh
Hi:
I have a few question about how state is shared in processors in Flink.
1. If I have a processor instantiated in the Flink app, and apply use in 
multiple times in the Flink -     (a) if the tasks are in the same slot - do 
they share the same processor on the taskmanager ?
    (b) if the tasks are on same node but different slots - do they share the 
same processor on the taskmanager ?

2. If I instantiate a single processor with local state and use it in multiple 
times in Flink     (a) if the tasks are in the same slot - do they share the 
same processor and state on the taskmanager ?
    (b) if the tasks are on same node but different slots - do they share the 
same processor and state on the taskmanager ?

3. If I instantiate a multiple processors with shared collection and use it in 
multiple times in Flink     (a) if the tasks are in the same slot - do they 
share the state on the taskmanager ?
    (b) if the tasks are on same node but different slots - do they share the 
state on the taskmanager ?
4. How do the above scenarios affect sharing (a) operator state(b) 
keyed state
5. If I have have a parallelism of > 1, and use keyBy - is each key handled by 
only one instance of the processor ?  I believe so, but wanted to confirm.

Thanks
Mans






Re: Apache Flink - Sharing state in processors

2020-01-23 Thread M Singh
 Thanks Yun for your answers.
By processor I did mean user defined processor function. Keeping that in view, 
do you have any advice on how the shared state - ie, the parameters passed to 
the processor as mentioned above (not the key state or operator state) will be 
affected in a distributed runtime env ?
Mans
On Sunday, January 12, 2020, 09:51:10 PM EST, Yun Tang  
wrote:  
 
 #yiv0773511519 P {margin-top:0;margin-bottom:0;}Hi Mans
What's the meaning of 'processor' you defined here? A user defined function?
When talking about share state, I'm afraid it's not so easy to implement in 
Flink. As no matter keyed state or operator state, they're both instantiated, 
used and only thread-safe in operator scope. The only way to read read-only 
state during runtime is via queryable state[1]
For the question of keyBy, the message would only sent to one of task in 
downstream according to the hashcode [2].

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html[2]
 
https://github.com/apache/flink/blob/7a6ca9c03f67f488e40a114e94c389a5cfb67836/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java#L58


BestYun Tang

From: M Singh 
Sent: Friday, January 10, 2020 23:29
To: User 
Subject: Apache Flink - Sharing state in processors Hi:
I have a few question about how state is shared in processors in Flink.
1. If I have a processor instantiated in the Flink app, and apply use in 
multiple times in the Flink -     (a) if the tasks are in the same slot - do 
they share the same processoron the taskmanager ?
    (b) if the tasks are on same node but different slots - do they share the 
same processor on the taskmanager ?

2. If I instantiate a single processor with local state and use it in multiple 
times in Flink     (a) if the tasks are in the same slot - do they share the 
same processor and stateon the taskmanager ?
    (b) if the tasks are on same node but different slots - do they share the 
same processor and stateon the taskmanager ?

3. If I instantiate a multiple processors with shared collection and use it in 
multiple times in Flink     (a) if the tasks are in the same slot - do they 
share the state on the taskmanager ?
    (b) if the tasks are on same node but different slots - do they share the 
stateon the taskmanager ?
4. How do the above scenarios affect sharing (a) operator state(b) 
keyed state
5. If I have have a parallelism of > 1, and use keyBy - is each key handled by 
only one instance of the processor ?  I believe so, but wanted to confirm.

Thanks
Mans




  

Apache Flink Job fails repeatedly due to RemoteTransportException

2020-01-28 Thread M Singh
Hi Folks:
We have streaming Flink application (using v 1.6.2) and it dies within 12 
hours.  We have configured number of restarts which is 10 at the moment.
Sometimes the job runs for some time and then within a very short time has a 
number of restarts and finally fails.  In other instances, the restarts happen 
randomly. So there is no pattern that I could discern for the restarts.
I can increase the restart count but would like to see if there is any advice 
on the root cause of this issue.  I've seen a some emails in the user groups 
but could not find any definitive solution or investigation steps.

Is there any any on how to investigate it further or resolve it ?
The exception we see in the job manager is:
2020-01-29 06:15:42,371 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job testJob 
(d65a52389f9ea30def1fe522bf3956c6) switched from state FAILING to FAILED.
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 
'ip-xx-xxx-xxx-xxx.ec2.internal/xx.xxx.xxx.xxx:39623'. This might indicate that 
the remote task manager was lost.
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:136)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:377)
at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:342)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1429)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:947)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:822)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
at java.lang.Thread.run(Thread.java:748)
2020-01-29 06:15:42,371 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Could not 
restart the job testJob (d65a52389f9ea30def1fe522bf3956c6) because the restart 
strategy prevented it.


Flink on Kubernetes - Session vs Job cluster mode and storage

2020-02-22 Thread M Singh
Hey Folks:
I am trying to figure out the options for running Flink on Kubernetes and am 
trying to find out the pros and cons of running in Flink Session vs Flink 
Cluster mode 
(https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#flink-session-cluster-on-kubernetes).
I understand that in job mode there is no need to submit the job since it is 
part of the job image.  But what are other the pros and cons of this approach 
vs session mode where a job manager is deployed and flink jobs can be submitted 
it ?  Are there any benefits with regards to:
1. Configuring the jobs 2. Scaling the taskmanager3. Restarting jobs4. Managing 
the flink jobs5. Passing credentials (in case of AWS, etc)6. Fault tolerence 
and recovery of jobs from failure
Also, we will be keeping the checkpoints for the jobs on S3.  Is there any need 
for specifying volume for the pods ?  If volume is required do we need 
provisioned volume and what are the recommended alternatives/considerations 
especially with AWS.
If there are any other considerations, please let me know.
Thanks for your advice.





Re: Flink on Kubernetes - Session vs Job cluster mode and storage

2020-02-24 Thread M Singh
 Thanks Wang for your detailed answers.
>From what I understand the native_kubernetes also leans towards creating a 
>session and submitting a job to it.  
Regarding other recommendations, please my inline comments and advice.
On Sunday, February 23, 2020, 10:01:10 PM EST, Yang Wang 
 wrote:  
 
 Hi Singh,
Glad to hear that you are looking to run Flink on the Kubernetes. I amtrying to 
answer your question based on my limited knowledge andothers could correct me 
and add some more supplements.
I think the biggest difference between session cluster and per-job clusteron 
Kubernetesis the isolation. Since for per-job, a dedicated Flink clusterwill be 
started for the only one job and no any other jobs could be submitted.Once the 
job is finished, then the Flink cluster will be destroyed immediately.The 
second point is one-step submission. You do not need to start a Flinkcluster 
first and then submit a job to the existing session.
> Are there any benefits with regards to1. Configuring the jobsNo matter you 
>are using the per-job cluster or submitting to the existingsession cluster, 
>they share the configuration mechanism. You do not haveto change any codes and 
>configurations.
2. Scaling the taskmanagerSince you are using the Standalone cluster on 
Kubernetes, it do not providean active resourcemanager. You need to use 
external tools to monitor andscale up the taskmanagers. The active integration 
is still evolving and youcould have a taste[1].
Mans - If we use the session based deployment option for K8 - I thought K8 will 
automatically restarts any failed TM or JM. In the case of failed TM - the job 
will probably recover, but in the case of failed JM - perhaps we need to 
resubmit all jobs.Let me know if I have misunderstood anything.
3. Restarting jobsFor the session cluster, you could directly cancel the job 
and re-submit. Andfor per-job cluster, when the job is canceled, you need to 
start a new per-jobcluster from the latest savepoint.
4. Managing the flink jobsThe rest api and flink command line could be used to 
managing the jobs(e.g.flink cancel, etc.). I think there is no difference for 
session and per-job here.
5. Passing credentials (in case of AWS, etc)
I am not sure how do you provide your credentials. If you put them in the 
config map and then mount into the jobmanager/taskmanager pod, then bothsession 
and per-job could support this way.
Mans - Is there any safe way of a passing creds ?
6. Fault tolerence and recovery of jobs from failure
For session cluster, if one taskmanager crashed, then all the jobs which have 
taskson this taskmanager will failed. Both session and per-job could be 
configured with high availability and recoverfrom the latest checkpoint. 
Mans - Does a task manager failure cause the job to fail ?  My understanding is 
the JM failure are catastrophic while TM failures are recoverable.
> Is there any need for specifying volume for the pods?No, you do not need to 
>specify a volume for pod. All the data in the pod local directory is 
>temporary. When a pod crashed and relaunched, thetaskmanager will retrieve the 
>checkpoint from zookeeper + S3 and resumefrom the latest checkpoint.
Mans - So if we are saving checkpoint in S3 then there is no need for disks - 
should we use emptyDir ?

[1]. 
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html
M Singh  于2020年2月23日周日 上午2:28写道:

Hey Folks:
I am trying to figure out the options for running Flink on Kubernetes and am 
trying to find out the pros and cons of running in Flink Session vs Flink 
Cluster mode 
(https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#flink-session-cluster-on-kubernetes).
I understand that in job mode there is no need to submit the job since it is 
part of the job image.  But what are other the pros and cons of this approach 
vs session mode where a job manager is deployed and flink jobs can be submitted 
it ?  Are there any benefits with regards to:
1. Configuring the jobs 2. Scaling the taskmanager3. Restarting jobs4. Managing 
the flink jobs5. Passing credentials (in case of AWS, etc)6. Fault tolerence 
and recovery of jobs from failure
Also, we will be keeping the checkpoints for the jobs on S3.  Is there any need 
for specifying volume for the pods ?  If volume is required do we need 
provisioned volume and what are the recommended alternatives/considerations 
especially with AWS.
If there are any other considerations, please let me know.
Thanks for your advice.




  

Re: Apache Flink Job fails repeatedly due to RemoteTransportException

2020-02-24 Thread M Singh
 Thanks will try your recommendations and apologize for the delayed response.
On Wednesday, January 29, 2020, 09:58:26 AM EST, Till Rohrmann 
 wrote:  
 
 Hi M Singh,
have you checked the TaskManager logs of 
ip-xx-xxx-xxx-xxx.ec2.internal/xx.xxx.xxx.xxx:39623 for any suspicious logging 
statements? This might help to uncover why another node thinks that this 
TaskManager is no longer reachable.
You could also try whether the same problem remains if you upgrade to one of 
Flink latest versions (1.9.1 for example).
Cheers,Till
On Wed, Jan 29, 2020 at 8:37 AM M Singh  wrote:

Hi Folks:
We have streaming Flink application (using v 1.6.2) and it dies within 12 
hours.  We have configured number of restarts which is 10 at the moment.
Sometimes the job runs for some time and then within a very short time has a 
number of restarts and finally fails.  In other instances, the restarts happen 
randomly. So there is no pattern that I could discern for the restarts.
I can increase the restart count but would like to see if there is any advice 
on the root cause of this issue.  I've seen a some emails in the user groups 
but could not find any definitive solution or investigation steps.

Is there any any on how to investigate it further or resolve it ?
The exception we see in the job manager is:
2020-01-29 06:15:42,371 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job testJob 
(d65a52389f9ea30def1fe522bf3956c6) switched from state FAILING to FAILED.
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 
'ip-xx-xxx-xxx-xxx.ec2.internal/xx.xxx.xxx.xxx:39623'. This might indicate that 
the remote task manager was lost.
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:136)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:377)
at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:342)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1429)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:947)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:822)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
at java.lang.Thread.run(Thread.java:748)
2020-01-29 06:15:42,371 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Could not 
restart the job testJob (d65a52389f9ea30def1fe522bf3956c6) because the restart 
strategy prevented it.

  

Re: Flink on Kubernetes - Session vs Job cluster mode and storage

2020-02-26 Thread M Singh
 Thanks Yang and Arvid for your advice and pointers.  Mans
On Wednesday, February 26, 2020, 09:52:26 AM EST, Arvid Heise 
 wrote:  
 
 Creds on AWS are typically resolved through roles assigned to K8s pods (for 
example with KIAM [1]).
[1] https://github.com/uswitch/kiam
On Tue, Feb 25, 2020 at 3:36 AM Yang Wang  wrote:

Hi M Singh,

> Mans - If we use the session based deployment option for K8 - I thought K8 
>will automatically restarts any failed TM or JM. 
In the case of failed TM - the job will probably recover, but in the case of 
failed JM - perhaps we need to resubmit all jobs.
Let me know if I have misunderstood anything.

Since you are starting JM/TM with K8s deployment, when they failed new JM/TM 
will be created. If you do not set the highavailability configuration, your 
jobs could recover when TM failed. However, they could not recover when JM 
failed. With HA
configured, the jobs could always be recovered and you do not need to re-submit 
again.

> Mans - Is there any safe way of a passing creds ?

Yes, you are right, Using configmap to pass the credentials is not safe. On 
K8s, i think you could use secrets instead[1].

> Mans - Does a task manager failure cause the job to fail ?  My understanding 
> is the JM failure are catastrophic while TM failures are recoverable.

What i mean is the job failed, and it could be restarted by your configured 
restart strategy[2].

> Mans - So if we are saving checkpoint in S3 then there is no need for disks - 
>should we use emptyDir ?
 Yes, if you are saving the checkpoint in S3 and also set the 
`high-availability.storageDir` to S3. Then you do not need persistent volume. 
Sincethe local directory is only used for local cache, so you could directly 
use the overlay filesystem or empryDir(better io performance).

[1]. 
https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/[2].
 
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#fault-tolerance
M Singh  于2020年2月25日周二 上午5:53写道:

 Thanks Wang for your detailed answers.
>From what I understand the native_kubernetes also leans towards creating a 
>session and submitting a job to it.  
Regarding other recommendations, please my inline comments and advice.
On Sunday, February 23, 2020, 10:01:10 PM EST, Yang Wang 
 wrote:  
 
 Hi Singh,
Glad to hear that you are looking to run Flink on the Kubernetes. I amtrying to 
answer your question based on my limited knowledge andothers could correct me 
and add some more supplements.
I think the biggest difference between session cluster and per-job clusteron 
Kubernetesis the isolation. Since for per-job, a dedicated Flink clusterwill be 
started for the only one job and no any other jobs could be submitted.Once the 
job is finished, then the Flink cluster will be destroyed immediately.The 
second point is one-step submission. You do not need to start a Flinkcluster 
first and then submit a job to the existing session.
> Are there any benefits with regards to1. Configuring the jobsNo matter you 
>are using the per-job cluster or submitting to the existingsession cluster, 
>they share the configuration mechanism. You do not haveto change any codes and 
>configurations.
2. Scaling the taskmanagerSince you are using the Standalone cluster on 
Kubernetes, it do not providean active resourcemanager. You need to use 
external tools to monitor andscale up the taskmanagers. The active integration 
is still evolving and youcould have a taste[1].
Mans - If we use the session based deployment option for K8 - I thought K8 will 
automatically restarts any failed TM or JM. In the case of failed TM - the job 
will probably recover, but in the case of failed JM - perhaps we need to 
resubmit all jobs.Let me know if I have misunderstood anything.
3. Restarting jobsFor the session cluster, you could directly cancel the job 
and re-submit. Andfor per-job cluster, when the job is canceled, you need to 
start a new per-jobcluster from the latest savepoint.
4. Managing the flink jobsThe rest api and flink command line could be used to 
managing the jobs(e.g.flink cancel, etc.). I think there is no difference for 
session and per-job here.
5. Passing credentials (in case of AWS, etc)
I am not sure how do you provide your credentials. If you put them in the 
config map and then mount into the jobmanager/taskmanager pod, then bothsession 
and per-job could support this way.
Mans - Is there any safe way of a passing creds ?
6. Fault tolerence and recovery of jobs from failure
For session cluster, if one taskmanager crashed, then all the jobs which have 
taskson this taskmanager will failed. Both session and per-job could be 
configured with high availability and recoverfrom the latest checkpoint. 
Mans - Does a task manager failure cause the job to fail ?  My understanding is 
the JM failure are catastrophic while TM failures are recoverable.
> Is there any need for specifying volume for t

Re: Flink on Kubernetes - Session vs Job cluster mode and storage

2020-02-26 Thread M Singh
 BTW - Is there any limit to the amount of data that can be stored on emptyDir 
in K8 ?  
On Wednesday, February 26, 2020, 07:33:54 PM EST, M Singh 
 wrote:  
 
  Thanks Yang and Arvid for your advice and pointers.  Mans
On Wednesday, February 26, 2020, 09:52:26 AM EST, Arvid Heise 
 wrote:  
 
 Creds on AWS are typically resolved through roles assigned to K8s pods (for 
example with KIAM [1]).
[1] https://github.com/uswitch/kiam
On Tue, Feb 25, 2020 at 3:36 AM Yang Wang  wrote:

Hi M Singh,

> Mans - If we use the session based deployment option for K8 - I thought K8 
>will automatically restarts any failed TM or JM. 
In the case of failed TM - the job will probably recover, but in the case of 
failed JM - perhaps we need to resubmit all jobs.
Let me know if I have misunderstood anything.

Since you are starting JM/TM with K8s deployment, when they failed new JM/TM 
will be created. If you do not set the highavailability configuration, your 
jobs could recover when TM failed. However, they could not recover when JM 
failed. With HA
configured, the jobs could always be recovered and you do not need to re-submit 
again.

> Mans - Is there any safe way of a passing creds ?

Yes, you are right, Using configmap to pass the credentials is not safe. On 
K8s, i think you could use secrets instead[1].

> Mans - Does a task manager failure cause the job to fail ?  My understanding 
> is the JM failure are catastrophic while TM failures are recoverable.

What i mean is the job failed, and it could be restarted by your configured 
restart strategy[2].

> Mans - So if we are saving checkpoint in S3 then there is no need for disks - 
>should we use emptyDir ?
 Yes, if you are saving the checkpoint in S3 and also set the 
`high-availability.storageDir` to S3. Then you do not need persistent volume. 
Sincethe local directory is only used for local cache, so you could directly 
use the overlay filesystem or empryDir(better io performance).

[1]. 
https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/[2].
 
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#fault-tolerance
M Singh  于2020年2月25日周二 上午5:53写道:

 Thanks Wang for your detailed answers.
>From what I understand the native_kubernetes also leans towards creating a 
>session and submitting a job to it.  
Regarding other recommendations, please my inline comments and advice.
On Sunday, February 23, 2020, 10:01:10 PM EST, Yang Wang 
 wrote:  
 
 Hi Singh,
Glad to hear that you are looking to run Flink on the Kubernetes. I amtrying to 
answer your question based on my limited knowledge andothers could correct me 
and add some more supplements.
I think the biggest difference between session cluster and per-job clusteron 
Kubernetesis the isolation. Since for per-job, a dedicated Flink clusterwill be 
started for the only one job and no any other jobs could be submitted.Once the 
job is finished, then the Flink cluster will be destroyed immediately.The 
second point is one-step submission. You do not need to start a Flinkcluster 
first and then submit a job to the existing session.
> Are there any benefits with regards to1. Configuring the jobsNo matter you 
>are using the per-job cluster or submitting to the existingsession cluster, 
>they share the configuration mechanism. You do not haveto change any codes and 
>configurations.
2. Scaling the taskmanagerSince you are using the Standalone cluster on 
Kubernetes, it do not providean active resourcemanager. You need to use 
external tools to monitor andscale up the taskmanagers. The active integration 
is still evolving and youcould have a taste[1].
Mans - If we use the session based deployment option for K8 - I thought K8 will 
automatically restarts any failed TM or JM. In the case of failed TM - the job 
will probably recover, but in the case of failed JM - perhaps we need to 
resubmit all jobs.Let me know if I have misunderstood anything.
3. Restarting jobsFor the session cluster, you could directly cancel the job 
and re-submit. Andfor per-job cluster, when the job is canceled, you need to 
start a new per-jobcluster from the latest savepoint.
4. Managing the flink jobsThe rest api and flink command line could be used to 
managing the jobs(e.g.flink cancel, etc.). I think there is no difference for 
session and per-job here.
5. Passing credentials (in case of AWS, etc)
I am not sure how do you provide your credentials. If you put them in the 
config map and then mount into the jobmanager/taskmanager pod, then bothsession 
and per-job could support this way.
Mans - Is there any safe way of a passing creds ?
6. Fault tolerence and recovery of jobs from failure
For session cluster, if one taskmanager crashed, then all the jobs which have 
taskson this taskmanager will failed. Both session and per-job could be 
configured with high availability and recoverfrom the latest checkpoint. 
Mans - Does a task mana

Flink - Once and once processing

2016-07-29 Thread M Singh
Hi:
I have a use case where we need to update a counter in a db and for this need 
to guarantee once only processing.  If we have some entries in a batch and it 
partially updates the counters and then fails, if Flink retries the processing 
for that batch, some of the counters will be updated twice (the ones which 
succeeded in the first batch).
I think in order to guarantee once only processing, I will have to set the 
buffer size to zero (ie, send one item at a time). 

Is there any alternative configuration or suggestion on how I can achieve once 
only updates using a batch mode with partial failures ?
Thanks
Mans



Re: Flink - Once and once processing

2016-07-30 Thread M Singh
Thanks Konstantin.
Just to clarify - unless the target database is resilient to duplicates, 
Flink's once-only configuration will not avoid duplicate updates.
Mans 

On Saturday, July 30, 2016 7:40 AM, Konstantin Knauf 
 wrote:
 

 Hi Mans,

depending on the number of operations and the particular database, you
might be able to use transactions.

Maybe you can also find a data model, which is more resilient to these
kind of failures.

Cheers,

Konstantin

On 29.07.2016 19:26, M Singh wrote:
> Hi:
> 
> I have a use case where we need to update a counter in a db and for this
> need to guarantee once only processing.  If we have some entries in a
> batch and it partially updates the counters and then fails, if Flink
> retries the processing for that batch, some of the counters will be
> updated twice (the ones which succeeded in the first batch).
> 
> I think in order to guarantee once only processing, I will have to set
> the buffer size to zero (ie, send one item at a time).
> 
> Is there any alternative configuration or suggestion on how I can
> achieve once only updates using a batch mode with partial failures ?
> 
> Thanks
> 
> Mans
> 

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


  

Re: Flink - Once and once processing

2016-08-01 Thread M Singh
Thanks Till.  I will take a look at your pointers.  Mans 

On Monday, August 1, 2016 6:27 AM, Till Rohrmann  
wrote:
 

 Hi Mans,
Milind is right that in general external systems have to play along if you want 
to achieve exactly once processing guarantees while writing to these systems. 
Either by supporting idempotent operations or by allowing to roll back their 
state.
In the batch world, this usually means to overwrite data from a previously 
failed execution run completely or having a unique key which does not change 
across runs.
In the case of streaming we can achieve exactly once guarantees by committing 
the data to the external system only after we have taken a checkpoint and 
buffering the data in between. This guarantees that the changes are only 
materialized after we are sure that we can go back to a checkpoint where we've 
already seen all the elements which might have caused the sink output. You can 
take a look at the CassandraSink where we're exactly doing this.
Cheers,Till
On Sun, Jul 31, 2016 at 2:59 AM, milind parikh  wrote:

Flink operates in conjunction with sources and sinks. So ,yes, there are things 
that an underlying sink  (or a source) must support in conjunction with   Flink 
to enable a particular semantic.On Jul 30, 2016 11:46 AM, "M Singh" 
 wrote:

Thanks Konstantin.
Just to clarify - unless the target database is resilient to duplicates, 
Flink's once-only configuration will not avoid duplicate updates.
Mans 

On Saturday, July 30, 2016 7:40 AM, Konstantin Knauf 
 wrote:
 

 Hi Mans,

depending on the number of operations and the particular database, you
might be able to use transactions.

Maybe you can also find a data model, which is more resilient to these
kind of failures.

Cheers,

Konstantin

On 29.07.2016 19:26, M Singh wrote:
> Hi:
> 
> I have a use case where we need to update a counter in a db and for this
> need to guarantee once only processing.  If we have some entries in a
> batch and it partially updates the counters and then fails, if Flink
> retries the processing for that batch, some of the counters will be
> updated twice (the ones which succeeded in the first batch).
> 
> I think in order to guarantee once only processing, I will have to set
> the buffer size to zero (ie, send one item at a time).
> 
> Is there any alternative configuration or suggestion on how I can
> achieve once only updates using a batch mode with partial failures ?
> 
> Thanks
> 
> Mans
> 

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


   




  

Flink : CEP processing

2016-08-09 Thread M Singh
Hey Folks:
I have a question about CEP processing in Flink - How does flink processing 
work when we have multiple partitions in which the events used in the pattern 
sequence might be scattered across multiple partitions on multiple nodes ?
Thanks for your insight.
Mans

Re: Flink : CEP processing

2016-08-09 Thread M Singh
Thanks Sameer.
So does that mean that if the events keys are not same we cannot use the CEP 
pattern match ?  What if events are coming from different sources and need to 
be correlated ?
Mans 

On Tuesday, August 9, 2016 9:40 AM, Sameer W  wrote:
 

 Hi,
You will need to use keyBy operation first to get all the events you need 
monitored in a pattern on the same node. Only then can you apply Pattern 
because it depends on the order of the events (first, next, followed by). I 
even had to make sure that the events were correctly sorted by timestamps to 
ensure that the first,next and followed by works correctly.
Sameer
On Tue, Aug 9, 2016 at 12:17 PM, M Singh  wrote:

Hey Folks:
I have a question about CEP processing in Flink - How does flink processing 
work when we have multiple partitions in which the events used in the pattern 
sequence might be scattered across multiple partitions on multiple nodes ?
Thanks for your insight.
Mans



  

Re: Flink : CEP processing

2016-08-09 Thread M Singh
Hi Sameer:
If we use a within window for event series - 
1. Does it interfere with the default time windows ?2. How does it affect 
snapshotting ?  3. If the window is too large are the events stored in a 
"processor" for the window to expire ?4. Are there any other know limitations 
and best practices of using CEP with Flink ?
Thanks again for your help.
 

On Tuesday, August 9, 2016 11:29 AM, Sameer Wadkar  
wrote:
 

 In that case you need to get them into one stream somehow (keyBy a dummy value 
for example). There is always some logical key to keyBy on when data is 
arriving from multiple sources (ex some portion of the time stamp). 
You are looking for patterns within something (events happening around the same 
time but arriving from multiple devices). That something should be the key. 
That's how I am using it. 

Sameer
Sent from my iPhone
On Aug 9, 2016, at 1:40 PM, M Singh  wrote:


Thanks Sameer.
So does that mean that if the events keys are not same we cannot use the CEP 
pattern match ?  What if events are coming from different sources and need to 
be correlated ?
Mans 

On Tuesday, August 9, 2016 9:40 AM, Sameer W  wrote:
 

 Hi,
You will need to use keyBy operation first to get all the events you need 
monitored in a pattern on the same node. Only then can you apply Pattern 
because it depends on the order of the events (first, next, followed by). I 
even had to make sure that the events were correctly sorted by timestamps to 
ensure that the first,next and followed by works correctly.
Sameer
On Tue, Aug 9, 2016 at 12:17 PM, M Singh  wrote:

Hey Folks:
I have a question about CEP processing in Flink - How does flink processing 
work when we have multiple partitions in which the events used in the pattern 
sequence might be scattered across multiple partitions on multiple nodes ?
Thanks for your insight.
Mans



   


  

Re: Flink : CEP processing

2016-08-10 Thread M Singh
Thanks for the pointers Sameer.

The reason I wanted to find out about snapshotting with CEP is because I 
thought that CEP state might also be snapshotted for recovery. If that is the 
case, then there are events in the CEP might be in two snapshots.
Mans 

On Tuesday, August 9, 2016 1:15 PM, Sameer W  wrote:
 

 In one of the earlier thread Till explained this to me 
(http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/CEP-and-Within-Clause-td8159.html)
1. Within does not use time windows. It sort of uses session windows where the 
session begins when the first event of the pattern is identified. The timer 
starts when the "first" event in the pattern fires. If the pattern completes 
"within" the designated times (meaning the "next" and "followed by" fire as 
will "within" the time specified) you have a match or else the window is 
removed. I don't know how it is implemented but I doubt it stores all the 
events in memory for the "within" window (there is not need to). It will only 
store the relevant events (first, next, followed by, etc). So memory would not 
be an issue here. If two "first" type events are identified I think two 
"within" sessions are created.
2. Snapshotting (I don't know much in this area so I cannot answer). Why should 
it be different though? You are using operators and state. It should work the 
same way. But I am not too familiar with that.
3. The "Within" window is not an issue. Even the window preceding that should 
not be unless you are using WindowFunction (more memory friendly alternative is 
https://ci.apache.org/ projects/flink/flink-docs- 
master/apis/streaming/windows. html#window-functions ) by themselves and using 
a really large window
4. The way I am using it, it is working fine. Some of the limitations I have 
seen are related to this paper not being fully implemented 
(https://people.cs.umass.edu/ ~yanlei/publications/sase- sigmod08.pdf). I don't 
know how to support negation in an event stream but I don't need it for now.
Thanks,Sameer

On Tue, Aug 9, 2016 at 3:45 PM, M Singh  wrote:

Hi Sameer:
If we use a within window for event series - 
1. Does it interfere with the default time windows ?2. How does it affect 
snapshotting ?  3. If the window is too large are the events stored in a 
"processor" for the window to expire ?4. Are there any other know limitations 
and best practices of using CEP with Flink ?
Thanks again for your help.
 

On Tuesday, August 9, 2016 11:29 AM, Sameer Wadkar  
wrote:
 

 In that case you need to get them into one stream somehow (keyBy a dummy value 
for example). There is always some logical key to keyBy on when data is 
arriving from multiple sources (ex some portion of the time stamp). 
You are looking for patterns within something (events happening around the same 
time but arriving from multiple devices). That something should be the key. 
That's how I am using it. 

Sameer
Sent from my iPhone
On Aug 9, 2016, at 1:40 PM, M Singh  wrote:


Thanks Sameer.
So does that mean that if the events keys are not same we cannot use the CEP 
pattern match ?  What if events are coming from different sources and need to 
be correlated ?
Mans 

On Tuesday, August 9, 2016 9:40 AM, Sameer W  wrote:
 

 Hi,
You will need to use keyBy operation first to get all the events you need 
monitored in a pattern on the same node. Only then can you apply Pattern 
because it depends on the order of the events (first, next, followed by). I 
even had to make sure that the events were correctly sorted by timestamps to 
ensure that the first,next and followed by works correctly.
Sameer
On Tue, Aug 9, 2016 at 12:17 PM, M Singh  wrote:

Hey Folks:
I have a question about CEP processing in Flink - How does flink processing 
work when we have multiple partitions in which the events used in the pattern 
sequence might be scattered across multiple partitions on multiple nodes ?
Thanks for your insight.
Mans



   


   



  

Apache Flink - How to destroy global window and release it's resources

2019-04-10 Thread M Singh
Hi:
I have a use case where I need to create a global window where I need to wait 
for unknown time for certain events for a particular key.  I understand that I 
can create a global window and use a custom trigger to initiate the function 
computation.  But I am not sure how to destroy the window after the triggering 
conditions is satisfied and the the events are purged.
If there is any better way of dealing with this situation, please let me know.
Thanks
Mans


Re: Apache Flink - Question about broadcast state pattern usage

2019-04-10 Thread M Singh
 Hi Guowei;

Thanks for your answer.
Do you have any example which illustrates using broadcast is used with multiple 
descriptors ?
Thanks


On Sunday, April 7, 2019, 10:10:15 PM EDT, Guowei Ma  
wrote:  
 
 Hi1. I think you could use "Using Managed Operator State"[1] 
(context.getOperatorStateStore().getBroadcastState()) to use the 
BroadCastState.  But you must use it very carefully and guarantee the semantics 
of broadcast state yourself. I think "The Broadcast State Pattern"[2] is some 
best practice for using broadcast state.2. The broadcast function is varargs. 
Since that you could pass multiple MapStateDescriptors to it.
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.html#using-managed-operator-state[2]
 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/broadcast_state.html

Best,Guowei

M Singh  于2019年4月7日周日 下午10:17写道:

Hi Flink folks:
I am reading the documentation on broadcast state pattern 
(https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/broadcast_state.html)
 and have following questions:
1. Point number 2 - '2. it is only available to specific operators that have as 
inputs a broadcasted stream and a non-broadcasted one,'.  From what I 
understand it can be used with connected streams.  Is there any other operator 
where it can be used ?

2. Point number 3 - '3. such an operator can have multiple broadcast states 
with different names.'.  Is there any additional documentation/example on how 
to implement/use multiple broadcast states ?
Thanks
Mans


  

Re: Apache Flink - How to destroy global window and release it's resources

2019-04-12 Thread M Singh
 Hi Fabian/Guowei:  

Thanks for your pointers.   

Fabian, as you pointed out, global window is never completely removed since 
it's end time is Long.MAX_VALUE, and that is my concern.  So, is there any 
other way of clean up the now purged global windows ?
Thanks again.



On Thursday, April 11, 2019, 4:16:24 AM EDT, Fabian Hueske 
 wrote:  
 
 Hi,
As far as I know, a window is only completely removed when time (event or 
processing time, depending on the window type) passes the window's end 
timestamp.Since, GlobalWindow's end timestamp is Long.MAX_VALUE, it is never 
completely removed.I'm not 100% sure what state is kept around. It might not be 
keyed state but just objects on the heap but not absolutely sure.

Aljoscha (in CC) should know the details here.
Best, Fabian

Am Do., 11. Apr. 2019 um 08:07 Uhr schrieb Guowei Ma :

Hi,I think you could return a proper TriggerResult, which defines how to deal 
with the window elements after computing a window in your trigger 
implementation. You could find the detail information from the doc[1].
1. 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#fire-and-purge
Best,Guowei

M Singh  于2019年4月11日周四 上午1:42写道:

Hi:
I have a use case where I need to create a global window where I need to wait 
for unknown time for certain events for a particular key.  I understand that I 
can create a global window and use a custom trigger to initiate the function 
computation.  But I am not sure how to destroy the window after the triggering 
conditions is satisfied and the the events are purged.
If there is any better way of dealing with this situation, please let me know.
Thanks
Mans


  

Re: Apache Flink - Question about broadcast state pattern usage

2019-04-12 Thread M Singh
 Hi Fabian:  Thanks for your answer. 

>From my understanding (please correct me), in the example above, we are 
>passing map descriptors to the same broadcast stream.  So, the elements/items 
>in that stream will be the same.  The only difference would be that in the 
>processBroadcastElement method of the KeyedBroadcastProcessFunction impl, we 
>could add different mappings of broadcast element (from the same broadcasted 
>stream) to different map states. I am looking at the documentation example 
>(https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/broadcast_state.html)
>  and still not sure how that will help ?
Thanks for your help.
Mans





On Thursday, April 11, 2019, 3:53:59 AM EDT, Fabian Hueske 
 wrote:  
 
 Hi,
you would simply pass multiple MapStateDescriptors to the broadcast method:
MapStateDescriptor bcState1 = ...
MapStateDescriptor bcState2 = ...

DataStream stream = ...
BroadcastStream bcStream = stream.broadcast(bcState1, bcState2);
Best,Fabian


Am Mi., 10. Apr. 2019 um 19:44 Uhr schrieb M Singh :

 Hi Guowei;

Thanks for your answer.
Do you have any example which illustrates using broadcast is used with multiple 
descriptors ?
Thanks


On Sunday, April 7, 2019, 10:10:15 PM EDT, Guowei Ma  
wrote:  
 
 Hi1. I think you could use "Using Managed Operator State"[1] 
(context.getOperatorStateStore().getBroadcastState()) to use the 
BroadCastState.  But you must use it very carefully and guarantee the semantics 
of broadcast state yourself. I think "The Broadcast State Pattern"[2] is some 
best practice for using broadcast state.2. The broadcast function is varargs. 
Since that you could pass multiple MapStateDescriptors to it.
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.html#using-managed-operator-state[2]
 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/broadcast_state.html

Best,Guowei

M Singh  于2019年4月7日周日 下午10:17写道:

Hi Flink folks:
I am reading the documentation on broadcast state pattern 
(https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/broadcast_state.html)
 and have following questions:
1. Point number 2 - '2. it is only available to specific operators that have as 
inputs a broadcasted stream and a non-broadcasted one,'.  From what I 
understand it can be used with connected streams.  Is there any other operator 
where it can be used ?

2. Point number 3 - '3. such an operator can have multiple broadcast states 
with different names.'.  Is there any additional documentation/example on how 
to implement/use multiple broadcast states ?
Thanks
Mans


  
  

Apache Flink - CEP vs SQL detecting patterns

2019-04-12 Thread M Singh
Hi:
I am looking at the documentation of the CEP and there is way to access 
patterns which have timeout.  But I could  not find similar capability in the 
Table and SQL interface detecting patterns.  I am assuming that the CEP 
interface is more comprehensive and complete than the SQL/Table interface.
Please let me know if I have missed anything.
Thanks
Mans


Re: Apache Flink - CEP vs SQL detecting patterns

2019-04-20 Thread M Singh
Dawid:
 
So, what happens when there is a timeout - is there any value/field in the 
resulting data stream that indicates that this was a timeout ?
Thanks

On Tuesday, April 16, 2019, 10:12:58 AM EDT, Dawid Wysakowicz 
 wrote:  
 
  
Hi Mans,
 
Yes you are right. That feature is not available in SQL, as there is no such 
feature in SQL standard. The only addition to SQL standard we introduce so far 
is the WITHIN clause. We might introduce the timed out patterns some time in 
the future, but personally I am not aware of such plans.
 
Best,
 
Dawid
 
 On 12/04/2019 22:40, M Singh wrote:
  
 
 Hi: 
  I am looking at the documentation of the CEP and there is way to access 
patterns which have timeout.  But I could  not find similar capability in the 
Table and SQL interface detecting patterns.  I am assuming that the CEP 
interface is more comprehensive and complete than the SQL/Table interface. 
  Please let me know if I have missed anything. 
  Thanks 
  Mans
 

Apache Flink - Question about dynamically changing window end time at run time

2019-04-23 Thread M Singh
Hi:
I am working on a project and need to change the end time of the window 
dynamically.  I want to find out if the end time of the window is used 
internally (for sorting windows/etc) except for handling watermarks that would 
cause problems if the end time was changed during run time after the window has 
been created even if no new event has arrived for that window.

I don't want to use GlobalWindow since from my understanding it never gets 
destroyed.

If there is any alternate way of dealing with this, please let me know.

Thanks
Mans


Re: Apache Flink - Question about dynamically changing window end time at run time

2019-04-24 Thread M Singh
 Hi Rong:
Thanks for your answer.
>From what I understand the dynamic gap session windows are also created when 
>the event is encountered.  I need to be able to change the window end time at 
>a later time based on what other events are in that window.  One way to do 
>this is to use GlobalWindows but then these are never deleted.

Regarding CEP option - I believe that CEP patterns cannot be changed 
dynamically once they've been complied which limits it usage.
 Please feel free to correct me. 

Thanks for your help and pointers.

On Tuesday, April 23, 2019, 8:12:56 PM EDT, Rong Rong  
wrote:  
 
 Hi Mans,
I am not sure what you meant by "dynamically change the end-time of a window. 
If you are referring to dynamically determines the firing time of the window, 
then it fits into the description of session window [1]: If you want to handle 
window end time dynamically, one way of which I can think of is the dynamic 
gap, session window [1] approach. with which you can specify the end-time of a 
window based on input elements. Provided that you are maintaining a session 
window. Another way to look at it is through the Flink-CEP library [2]. 
Thanks,Rong

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#session-windows[2]
 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/libs/cep.html#groups-of-patterns
On Tue, Apr 23, 2019 at 8:19 AM M Singh  wrote:

Hi:
I am working on a project and need to change the end time of the window 
dynamically.  I want to find out if the end time of the window is used 
internally (for sorting windows/etc) except for handling watermarks that would 
cause problems if the end time was changed during run time after the window has 
been created even if no new event has arrived for that window.

I don't want to use GlobalWindow since from my understanding it never gets 
destroyed.

If there is any alternate way of dealing with this, please let me know.

Thanks
Mans

  

Re: Apache Flink - Question about dynamically changing window end time at run time

2019-04-28 Thread M Singh
 Thanks Sameer/Rong:
As Fabian and you have mentioned, the window still sticks around forever for 
global window, so I am trying avoid that scenario.
Fabian & Flink team - do you have any insights into what would happen if I 
create a window and the later change it's end time during the stream processing 
?  Would it mess up any internal state/processing that uses the end time when 
the window was first created ?  If there is any other consideration to keep in 
mind, please let me know.
Thanks again.

On Wednesday, April 24, 2019, 1:29:18 PM EDT, Rong Rong 
 wrote:  
 
 Hi Mans,
Sameer is correct. if you would like to control window triggering based on 
other elements that does not belong to this window (in a keyed stream context) 
then this is probably the best way to approach. 
I think you've also posted in another thread that describes what will be left 
after fire-and-purge [1]. As Fabian stated: the only thing that might've left 
after is the window (which is the 2 long values indicate the start/end) and the 
trigger object. But you are right it might eventually filled up memory.
Another approach is to implement your own operator that handles all these 
internally by your user code. This would require you to replicate many of the 
window operator logic though.
Thanks,Rong
[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Apache-Flink-How-to-destroy-global-window-and-release-it-s-resources-td27191.html#a27212
On Wed, Apr 24, 2019 at 5:02 AM Sameer W  wrote:

Global Windows is fine for this use case. I have used the same strategy. You 
just define custom evictors and triggers and you are all good. Windows are 
managed by keys, so as such as long as events are evicted from the window, that 
counts towards reclaiming memory for the key+window combination. Plus there is 
just window per key with Global Windows. 
On Wed, Apr 24, 2019 at 7:47 AM M Singh  wrote:

 Hi Rong:
Thanks for your answer.
>From what I understand the dynamic gap session windows are also created when 
>the event is encountered.  I need to be able to change the window end time at 
>a later time based on what other events are in that window.  One way to do 
>this is to use GlobalWindows but then these are never deleted.

Regarding CEP option - I believe that CEP patterns cannot be changed 
dynamically once they've been complied which limits it usage.
 Please feel free to correct me. 

Thanks for your help and pointers.

On Tuesday, April 23, 2019, 8:12:56 PM EDT, Rong Rong  
wrote:  
 
 Hi Mans,
I am not sure what you meant by "dynamically change the end-time of a window. 
If you are referring to dynamically determines the firing time of the window, 
then it fits into the description of session window [1]: If you want to handle 
window end time dynamically, one way of which I can think of is the dynamic 
gap, session window [1] approach. with which you can specify the end-time of a 
window based on input elements. Provided that you are maintaining a session 
window. Another way to look at it is through the Flink-CEP library [2]. 
Thanks,Rong

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#session-windows[2]
 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/libs/cep.html#groups-of-patterns
On Tue, Apr 23, 2019 at 8:19 AM M Singh  wrote:

Hi:
I am working on a project and need to change the end time of the window 
dynamically.  I want to find out if the end time of the window is used 
internally (for sorting windows/etc) except for handling watermarks that would 
cause problems if the end time was changed during run time after the window has 
been created even if no new event has arrived for that window.

I don't want to use GlobalWindow since from my understanding it never gets 
destroyed.

If there is any alternate way of dealing with this, please let me know.

Thanks
Mans

  

  

Re: Apache Flink - Question about dynamically changing window end time at run time

2019-04-29 Thread M Singh
 Sounds great Fabian.  

I was just trying to see if I can use higher level datastream apis.  

I appreciate your advice and help.  

Mans

On Monday, April 29, 2019, 5:41:36 AM EDT, Fabian Hueske 
 wrote:  
 
 Hi Mans,
I don't know if that would work or not. Would need to dig into the source code 
for that. 

TBH, I would recommend to check if you can implement the logic using a 
(Keyed-)ProcessFunction.IMO, process functions are a lot easier to reason about 
than Flink's windowing framework. 
You can manage state and timer all by yourself and make sure everything is 
properly cleaned up.

Best,Fabian


Am So., 28. Apr. 2019 um 16:31 Uhr schrieb M Singh :

 Thanks Sameer/Rong:
As Fabian and you have mentioned, the window still sticks around forever for 
global window, so I am trying avoid that scenario.
Fabian & Flink team - do you have any insights into what would happen if I 
create a window and the later change it's end time during the stream processing 
?  Would it mess up any internal state/processing that uses the end time when 
the window was first created ?  If there is any other consideration to keep in 
mind, please let me know.
Thanks again.

On Wednesday, April 24, 2019, 1:29:18 PM EDT, Rong Rong 
 wrote:  
 
 Hi Mans,
Sameer is correct. if you would like to control window triggering based on 
other elements that does not belong to this window (in a keyed stream context) 
then this is probably the best way to approach. 
I think you've also posted in another thread that describes what will be left 
after fire-and-purge [1]. As Fabian stated: the only thing that might've left 
after is the window (which is the 2 long values indicate the start/end) and the 
trigger object. But you are right it might eventually filled up memory.
Another approach is to implement your own operator that handles all these 
internally by your user code. This would require you to replicate many of the 
window operator logic though.
Thanks,Rong
[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Apache-Flink-How-to-destroy-global-window-and-release-it-s-resources-td27191.html#a27212
On Wed, Apr 24, 2019 at 5:02 AM Sameer W  wrote:

Global Windows is fine for this use case. I have used the same strategy. You 
just define custom evictors and triggers and you are all good. Windows are 
managed by keys, so as such as long as events are evicted from the window, that 
counts towards reclaiming memory for the key+window combination. Plus there is 
just window per key with Global Windows. 
On Wed, Apr 24, 2019 at 7:47 AM M Singh  wrote:

 Hi Rong:
Thanks for your answer.
>From what I understand the dynamic gap session windows are also created when 
>the event is encountered.  I need to be able to change the window end time at 
>a later time based on what other events are in that window.  One way to do 
>this is to use GlobalWindows but then these are never deleted.

Regarding CEP option - I believe that CEP patterns cannot be changed 
dynamically once they've been complied which limits it usage.
 Please feel free to correct me. 

Thanks for your help and pointers.

On Tuesday, April 23, 2019, 8:12:56 PM EDT, Rong Rong  
wrote:  
 
 Hi Mans,
I am not sure what you meant by "dynamically change the end-time of a window. 
If you are referring to dynamically determines the firing time of the window, 
then it fits into the description of session window [1]: If you want to handle 
window end time dynamically, one way of which I can think of is the dynamic 
gap, session window [1] approach. with which you can specify the end-time of a 
window based on input elements. Provided that you are maintaining a session 
window. Another way to look at it is through the Flink-CEP library [2]. 
Thanks,Rong

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#session-windows[2]
 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/libs/cep.html#groups-of-patterns
On Tue, Apr 23, 2019 at 8:19 AM M Singh  wrote:

Hi:
I am working on a project and need to change the end time of the window 
dynamically.  I want to find out if the end time of the window is used 
internally (for sorting windows/etc) except for handling watermarks that would 
cause problems if the end time was changed during run time after the window has 
been created even if no new event has arrived for that window.

I don't want to use GlobalWindow since from my understanding it never gets 
destroyed.

If there is any alternate way of dealing with this, please let me know.

Thanks
Mans

  

  
  

Re: Emitting current state to a sink

2019-04-29 Thread M Singh
 Hi Avi:
Can you please elaborate (or include an example/code snippet) of how you were 
able to achieve collecting the keyed states from the processBroadcastElement 
method using the applyToKeyedState ?  

I am trying to understand which collector you used to emit the state since the 
broadcasted elements/state might be different from the non-broadcast 
elements/state.
Thanks for your help.

Mans
On Monday, April 29, 2019, 7:29:23 AM EDT, Fabian Hueske 
 wrote:  
 
 Nice! 
Thanks for the confirmation :-)
Am Mo., 29. Apr. 2019 um 13:21 Uhr schrieb Avi Levi :

Thanks! Works like a charm :)

On Mon, Apr 29, 2019 at 12:11 PM Fabian Hueske  wrote:

This Message originated outside your organization.
Hi Avi,
I'm not sure if  you cannot emit data from the keyed state when you receive a 
broadcasted message.
The Context parameter of the processBroadcastElement() method in the 
KeyedBroadcastProcessFunction has the applyToKeyedState() method.The method 
takes a KeyedStateFunction that is applied to each key of a state, but does not 
provide a Collector to emit data.Maybe you can pass the collector to the 
KeyedStateFunction and emit records while it iterates over the key space.

Best, Fabian

Am Fr., 26. Apr. 2019 um 17:35 Uhr schrieb Avi Levi :

Hi Timo,I defiantly did. but broadcasting a command and trying to address the 
persisted state (I mean the state of the data stream and not the broadcasted 
one) you get the exception that I wrote (java.lang.NullPointerException: No key 
set. This method should not be called outside of a keyed context). e.g doing 
something likeoverride def processBroadcastElement(value: BroadcastRequest, 
ctx: KeyedBroadcastProcessFunction[String, Request, BroadcastRequest, 
Response]#Context, out: Collector[Response]): Unit = {
  value match {
case Command(StateCmd.Fetch, _) =>
  if (state.value() != null) {
ouout.collecy(state.value())
  }will yield that exception
BRAvi
On Fri, Apr 26, 2019 at 11:55 AM Timo Walther  wrote:

This Message originated outside your organization.

Hi Avi,

did you have a look at the .connect() and .broadcast() API 
functionalities? They allow you to broadcast a control stream to all 
operators. Maybe this example [1] or other examples in this repository 
can help you.

Regards,
Timo

[1] 
https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java

Am 26.04.19 um 07:57 schrieb Avi Levi:
> Hi,
> We have a keyed pipeline with persisted state.
> Is there a way to broadcast a command and collect all values that 
> persisted in  the state ?
>
> The end result can be for example sending a fetch command to all 
> operators and emitting the results to some sink
>
> why do we need it ? from time to time we might want to check if we are 
> missing keys what are the additional keys or simply emit the current 
> state to a table and to query it.
>
> I tried simply broadcasting a command and addressing the persisted 
> state but that resulted with:
> java.lang.NullPointerException: No key set. This method should not be 
> called outside of a keyed context.
>
> is there a good way to achieve that ?
>
> Cheers
> Avi





  

Re: assignTimestampsAndWatermarks not work after KeyedStream.process

2019-04-29 Thread M Singh
 Hi An0:
Here is my understanding - each operator has the watermark which is the lowest 
of all it's input streams. When the watermark for an operator is updated, the 
lowest one becomes the new watermark for that operator and is fowarded to the 
output streams for that operator.  So, if one of the stream's watermark is the 
not updated, it might keep the operator's watermark to move forward, thereby 
affecting the watermark emitted to the following operators.

Here is the description for how watermarks work - 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#watermarks-in-parallel-streams
Hope that helps.







On Monday, April 29, 2019, 2:06:12 PM EDT, an0  wrote:  
 
 Thanks very much. It definitely explains the problem I'm seeing. However, 
something I need to confirm:
You say "Watermarks are broadcasted/forwarded anyway." Do you mean, in 
assingTimestampsAndWatermarks.keyBy.window, it doesn't matter what data flows 
through a specific key's stream, all key streams have the same watermarks? So 
time-wise, `window` behaves as if `keyBy` is not there at all?

On 2019/04/26 06:34:10, Dawid Wysakowicz  wrote: 
> Hi,
> 
> Watermarks are meta events that travel independently of data events.
> 
> 1) If you assingTimestampsAndWatermarks before keyBy, all parallel
> instances of trips have some data(this is my assumption) so Watermarks
> can be generated. Afterwards even if some of the keyed partitions have
> no data, Watermarks are broadcasted/forwarded anyway. In other words if
> at some point Watermarks were generated for all partitions of a single
> stage, they will be forwarded beyond this point.
> 
> 2) If you assingTimestampsAndWatermarks after keyBy, you try to assign
> watermarks for an empty partition which produces no Watermarks at all
> for this partition, therefore there is no progress beyond this point.
> 
> I hope this clarifies it a bit.
> 
> Best,
> 
> Dawid
> 
> On 25/04/2019 16:49, an0 wrote:
> > If my understanding is correct, then why `assignTimestampsAndWatermarks` 
> > before `keyBy` works? The `timeWindowAll` stream's input streams are task 1 
> > and task 2, with task 2 idling, no matter whether 
> > `assignTimestampsAndWatermarks` is before or after `keyBy`, because whether 
> > task 2 receives elements only depends on the key distribution, has nothing 
> > to do with timestamp assignment, right?
> >
> >                                                                             
> >           /key 1 trips\
> >                                                                             
> >         /                    \  
> > (A) trips--> assignTimestampsAndWatermarks-->keyBy                    
> > timeWindowAll
> >                                                                             
> >         \      idle        /
> >                                                                             
> >           \key 2 trips/
> >
> >                            /key 1 trips--> assignTimestampsAndWatermarks\
> >                          /                                                  
> >                              \  
> > (B) trips-->keyBy                                                           
> >                      timeWindowAll
> >                          \      idle                                        
> >                            /
> >                            \key 2 trips--> assignTimestampsAndWatermarks/
> >
> > How things are different between A and B from `timeWindowAll`'s perspective?
> >
> > BTW, thanks for the webinar link, I'll check it later.
> >
> > On 2019/04/25 08:30:20, Dawid Wysakowicz  wrote: 
> >> Hi,
> >>
> >> Yes I think your explanation is correct. I can also recommend Seth's
> >> webinar where he talks about debugging Watermarks[1]
> >>
> >> Best,
> >>
> >> Dawid
> >>
> >> [1]
> >> https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial
> >>
> >> On 22/04/2019 22:55, an0 wrote:
> >>> Thanks, I feel I'm getting closer to the truth. 
> >>>
> >>> So parallelism is the cause? Say my parallelism is 2. Does that mean I 
> >>> get 2 tasks running after `keyBy` if even all elements have the same key 
> >>> so go to 1 down stream(say task 1)? And it is the other task(task 2) with 
> >>> no incoming data that caused the `timeWindowAll` stream unable to 
> >>> progress? Because both task 1 and task 2 are its input streams and one is 
> >>> idling so its event time cannot make progress?
> >>>
> >>> On 2019/04/22 01:57:39, Guowei Ma  wrote: 
>  HI,
> 
>  BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it
>  receives an element.
> 
>  For after Keyby:
>  Flink uses the HashCode of key and the parallelism of down stream to 
>  decide
>  which subtask would receive the element. This means if your key is always
>  same, all the sources will only send the elements to the same down stream
>  task, for example only no. 3 BoundedOutOfOrd

Re: Emitting current state to a sink

2019-04-30 Thread M Singh
 Thanks Avi for your help.  Mans

On Tuesday, April 30, 2019, 5:57:51 AM EDT, Avi Levi 
 wrote:  
 
 Sure! 
you get the context and the collector in the processBroadcastElement method see 
snippet below 
  override def processBroadcastElement(value: BroadcastRequest, ctx: 
KeyedBroadcastProcessFunction[String, Request, BroadcastRequest, 
String]#Context, out: Collector[String]): Unit = {
   
ctxctx.applyToKeyedState(stateDescriptor, new KeyedStateFunction[String, 
ValueState[String]] {  override def process(key: String, state: 
ValueState[String]): Unit = Option(state.value()).foreach(s => out.collect(s))
  }) ... 
 }
On Mon, Apr 29, 2019 at 5:45 PM M Singh  wrote:

This Message originated outside your organization.
 Hi Avi:
Can you please elaborate (or include an example/code snippet) of how you were 
able to achieve collecting the keyed states from the processBroadcastElement 
method using the applyToKeyedState ?  

I am trying to understand which collector you used to emit the state since the 
broadcasted elements/state might be different from the non-broadcast 
elements/state.
Thanks for your help.

Mans
On Monday, April 29, 2019, 7:29:23 AM EDT, Fabian Hueske 
 wrote:  
 
 Nice! 
Thanks for the confirmation :-)
Am Mo., 29. Apr. 2019 um 13:21 Uhr schrieb Avi Levi :

Thanks! Works like a charm :)

On Mon, Apr 29, 2019 at 12:11 PM Fabian Hueske  wrote:

This Message originated outside your organization.
Hi Avi,
I'm not sure if  you cannot emit data from the keyed state when you receive a 
broadcasted message.
The Context parameter of the processBroadcastElement() method in the 
KeyedBroadcastProcessFunction has the applyToKeyedState() method.The method 
takes a KeyedStateFunction that is applied to each key of a state, but does not 
provide a Collector to emit data.Maybe you can pass the collector to the 
KeyedStateFunction and emit records while it iterates over the key space.

Best, Fabian

Am Fr., 26. Apr. 2019 um 17:35 Uhr schrieb Avi Levi :

Hi Timo,I defiantly did. but broadcasting a command and trying to address the 
persisted state (I mean the state of the data stream and not the broadcasted 
one) you get the exception that I wrote (java.lang.NullPointerException: No key 
set. This method should not be called outside of a keyed context). e.g doing 
something likeoverride def processBroadcastElement(value: BroadcastRequest, 
ctx: KeyedBroadcastProcessFunction[String, Request, BroadcastRequest, 
Response]#Context, out: Collector[Response]): Unit = {
  value match {
case Command(StateCmd.Fetch, _) =>
  if (state.value() != null) {
ouout.collecy(state.value())
  }will yield that exception
BRAvi
On Fri, Apr 26, 2019 at 11:55 AM Timo Walther  wrote:

This Message originated outside your organization.

Hi Avi,

did you have a look at the .connect() and .broadcast() API 
functionalities? They allow you to broadcast a control stream to all 
operators. Maybe this example [1] or other examples in this repository 
can help you.

Regards,
Timo

[1] 
https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java

Am 26.04.19 um 07:57 schrieb Avi Levi:
> Hi,
> We have a keyed pipeline with persisted state.
> Is there a way to broadcast a command and collect all values that 
> persisted in  the state ?
>
> The end result can be for example sending a fetch command to all 
> operators and emitting the results to some sink
>
> why do we need it ? from time to time we might want to check if we are 
> missing keys what are the additional keys or simply emit the current 
> state to a table and to query it.
>
> I tried simply broadcasting a command and addressing the persisted 
> state but that resulted with:
> java.lang.NullPointerException: No key set. This method should not be 
> called outside of a keyed context.
>
> is there a good way to achieve that ?
>
> Cheers
> Avi





  
  

Apache Flink - How to pass configuration params in the flink-config.yaml file to local execution environment

2019-05-19 Thread M Singh
Hey Flink Folks:
I was trying to find out how can pass the params in the flink-config.yaml file 
to a application running in local env. 

Can I create a flink-config.yaml file and include it in the class path ? Or can 
I pass the parameters via the parameter tool ?
Please let me know if there is any documentation on this.

Thanks.




Apache Flink - Disabling system metrics and collecting only specific metrics

2019-06-11 Thread M Singh
Hi:
I am working on an application and need to collect application metrics. I would 
like to use Flink's metrics framework for my application metrics.  From Flink's 
documentation 
(https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#system-metrics),
 it looks like Flink collects system metrics by default but I don't need those.
Is there any way to configure metrics to 
1. disable system metrics collection, 2. collect only specific metrics.
If there is any documentation/configuration that I might have missed, please 
let me know.
Thanks
Mans



Re: Apache Flink - Disabling system metrics and collecting only specific metrics

2019-06-12 Thread M Singh
 Thanks Zhijiang for your response.  
I see that system resources reporting can be enable (default not enabled) but 
not system metrics.  I just wanted to confirm that I am not missing anything.
Thanks again.
On Tuesday, June 11, 2019, 10:32:51 PM EDT, zhijiang 
 wrote:  
 
 Hi Mans,
AFAIK, we have no switch to disable general system metrics which are useful for 
monitoring status and performance tuning. Only some advanced system metrics 
could be confgiured to enable or not, and the default config is always 
disabled, so you do not need toconcern them.
Maybe you could implement a custom MetricReporter, and then only consentrate on 
your required application metrics in the method of  
`MetricReporter#notifyOfAddedMetric` to show them in backend.
Best,Zhijiang

--From:M Singh 
Send Time:2019年6月12日(星期三) 00:30To:User 
Subject:Apache Flink - Disabling system metrics and 
collecting only specific metrics
Hi:
I am working on an application and need to collect application metrics. I would 
like to use Flink's metrics framework for my application metrics.  From Flink's 
documentation 
(https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#system-metrics),
 it looks like Flink collects system metrics by default but I don't need those.
Is there any way to configure metrics to 
1. disable system metrics collection, 2. collect only specific metrics.
If there is any documentation/configuration that I might have missed, please 
let me know.
Thanks
Mans



  

Apache Flink - Question about metric registry and reporter and context information

2019-06-14 Thread M Singh
Hi:
I wanted to find if the metric reporter and registry are instantiated per task 
manager (which is a single JVM process) or per slot.  I believe it per task 
manager (JVM process) but just wanted to confirm.
Also, is there a way to access context information (eg: task manager name etc) 
in the metric reporter or registry just like in the rich function open method ?
Thanks

Apache Flink Sql - How to access EXPR$0, EXPR$1 values from a Row in a table

2019-06-14 Thread M Singh
Hi:
I am working with Flink Sql and have a table with the following schema:
root |-- name: String |-- idx: Integer |-- pos: String |-- tx: Row(EXPR$0: 
Integer, EXPR$1: String)
How can I access the attributes tx.EXPR$0 and tx.EXPR$1 ?
I tried the following (the table is registered as 'tupleTable')
        Table tuples = myTableFuntionResultTuple.select("select name, idx, pos, 
tx.EXPR$0, tx.EXPR$1 from tupleTable");
but I get the following exception
Exception in thread "main" 
org.apache.flink.table.api.ExpressionParserException: Could not parse 
expression at column 8: `,' expected but `n' foundselect name, idx, pos, 
tx.EXPR$0, tx.EXPR$1 from tupleTable
Please let me know how if there is any documentation or samples for accessing 
the tuples values in a table.
Thanks
Mans

Re: Apache Flink - How to pass configuration params in the flink-config.yaml file to local execution environment

2019-06-26 Thread M Singh
 Hey Folks:  Just wanted to see if you have any advice on this issue of passing 
config parameters to the application.  I've tried passing parameters by using

ParameterTool parameterTool = 
ParameterTool.fromMap(config);StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();env.getConfig().setGlobalJobParameters(parameterTool);
But the parameters in the parameter argument are not picked up by the 
application.
Thanks again for your help.
Mans
On Sunday, May 19, 2019, 02:50:03 PM EDT, M Singh  
wrote:  
 
 Hey Flink Folks:
I was trying to find out how can pass the params in the flink-config.yaml file 
to a application running in local env. 

Can I create a flink-config.yaml file and include it in the class path ? Or can 
I pass the parameters via the parameter tool ?
Please let me know if there is any documentation on this.

Thanks.


  

Apache Flink - Running application with different flink configurations (flink-conf.yml) on EMR

2019-06-26 Thread M Singh
Hi:
I have a single EMR cluster with Flink and want to run multiple applications on 
it with different flink configurations.  Is there a way to 
1. Pass the config file name for each application, or2. Overwrite the config 
parameters via command line arguments for the application.  This is similar to 
how we can overwrite the default parameters in spark
I searched the documents and have tried using ParameterTool with the config 
parameter names, but it has not worked as yet.
Thanks for your help.
Mans

Apache Flink - Are counters reliable and accurate ?

2019-06-27 Thread M Singh
Hi:
I need to collect application metrics which are counts (per unit of time eg: 
minute)  for certain events.  There are two ways of doing this:
1. Create separate streams (using split stream etc) in the application 
explicitly, then aggregate the counts in a window and save them.  This mixes 
metrics collection with application logic and making the application logic 
complex.2. Use Flink metrics framework (counter, guage, etc) to save metrics
I have a very small test with 2 events but when I run the application the 
counters are not getting saved (they show value 0) even though that part of the 
code is being executed.  I do see the numRecordsIn counters being updated from 
the source operator.  I've also tried incrementing the count by 10 (instead of 
1) every time the function gets execute but still the counts remain 0.
Here is snippet of the code:
dataStream.map(new RichMapFunction() {
            protected Counter counter;
            public void open(Configuration parameters) {                counter 
= getRuntimeContext().getMetricGroup().addGroup("test", 
"split").counter("success");            }            @Override            
public String map(String value) throws Exception {                
counter.inc();                return value;            }        });

As I mentioned, I do get the success metric count but the value is always 0, 
even though the above map function was executed.  
My questions are:
1. Are there any issues regarding counters being approximate ?2. If I want to 
collect accurate counts, is it recommended to use counters or should I do it 
explicitly (which is making the code too complex) ?3. Do counters participate 
in flink's failure/checkpointing/recovery ?4. Is there any better way of 
collecting application metric counts ?
Thanks
Mans

Re: Apache Flink - Are counters reliable and accurate ?

2019-06-27 Thread M Singh
 Hi Chesnay:
Thanks for your response.
My job runs for a few minutes and i've tried setting the reporter interval to 1 
second.
I will try the counter on a longer running job.
Thanks again.
On Thursday, June 27, 2019, 11:46:17 AM EDT, Chesnay Schepler 
 wrote:  
 
  1) None that I'm aware of.
 2) You should use counters.
 3) No, counters are not checkpointed, but you could store the value in state 
yourself.
 4) None that I'm aware of that doesn't require modifications to the 
application logic.
 
 How long does your job run for, and how do you access metrics?
 
 On 27/06/2019 17:36, M Singh wrote:
  
  Hi: 
  I need to collect application metrics which are counts (per unit of time eg: 
minute)  for certain events.  There are two ways of doing this: 
  1. Create separate streams (using split stream etc) in the application 
explicitly, then aggregate the counts in a window and save them.  This mixes 
metrics collection with application logic and making the application logic 
complex. 2. Use Flink metrics framework (counter, guage, etc) to save metrics 
  I have a very small test with 2 events but when I run the application the 
counters are not getting saved (they show value 0) even though that part of the 
code is being executed.  I do see the numRecordsIn counters being updated from 
the source operator.  I've also tried incrementing the count by 10 (instead of 
1) every time the function gets execute but still the counts remain 0. 
  Here is snippet of the code: 
dataStream.map(new RichMapFunction() { 
              protected Counter counter; 
              public void open(Configuration parameters) {                 
counter = getRuntimeContext().getMetricGroup().addGroup("test", 
"split").counter("success");             }             @Override             
public String map(String value) throws Exception {                 
counter.inc();                 return value;             }         });  
  
  As I mentioned, I do get the success metric count but the value is always 0, 
even though the above map function was executed.   
  My questions are: 
  1. Are there any issues regarding counters being approximate ? 2. If I want 
to collect accurate counts, is it recommended to use counters or should I do it 
explicitly (which is making the code too complex) ? 3. Do counters participate 
in flink's failure/checkpointing/recovery ? 4. Is there any better way of 
collecting application metric counts ? 
  Thanks 
  Mans  
 

 
   

Re: Apache Flink - Running application with different flink configurations (flink-conf.yml) on EMR

2019-06-27 Thread M Singh
 Hi Xintong:  Thanks for your pointers.
I tried -Dparallelism.default=2 locally (in IDE) and it did not work.  Do you 
know if there is a common way that would work both for emr, locally and ide ?
Thanks again.
On Thursday, June 27, 2019, 12:03:06 AM EDT, Xintong Song 
 wrote:  
 
 Hi Singh,
You can use the environment variable "FLINK_CONF_DIR" to specify path to the 
directory of config files. You can also override config options with command 
line arguments prefixed -D (for yarn-session.sh) or -yD (for 'flink run' 
command).

Thank you~

Xintong Song




On Wed, Jun 26, 2019 at 9:13 PM M Singh  wrote:

Hi:
I have a single EMR cluster with Flink and want to run multiple applications on 
it with different flink configurations.  Is there a way to 
1. Pass the config file name for each application, or2. Overwrite the config 
parameters via command line arguments for the application.  This is similar to 
how we can overwrite the default parameters in spark
I searched the documents and have tried using ParameterTool with the config 
parameter names, but it has not worked as yet.
Thanks for your help.
Mans
  

Apache Flink - Multiple Kinesis stream consumers

2019-07-01 Thread M Singh
Hi:
I am trying to understand how does Flink coordinate multiple kinesis consumers 
and am trying to find out:
1. Is it possible to read same kinesis stream independently multiple times 
within a single application ? How does Flink coordinate consuming same kinesis 
multiple times in a single application ? Are there any issues that can arise 
from this pattern ?2. How does Flink coordinate consumers of same kinesis 
stream across multiple applications ? 
Thanks
Mans

Re: Apache Flink - Running application with different flink configurations (flink-conf.yml) on EMR

2019-07-01 Thread M Singh
 Thanks Jeff and Xintong for your pointers.
On Friday, June 28, 2019, 10:44:35 AM EDT, Jeff Zhang  
wrote:  
 
 
This is due to flink doesn't unify the execution in different enviroments. The 
community has discuss it before about how to enhance the flink client api. The 
initial proposal is to introduce FlinkConf which contains all the configuration 
so that we can unify the executions in all environments (IDE, CLI, SQL Client, 
Scala Shell, downstream project)
Here's the sample code:

val conf = new FlinkConf().setProperty(“key_1”, “value_1”)     // create 
FlinkConf

val env = new ExecutionEnvironment(conf)   // create ExecutionEnvironment

val jobId = env.submit(...)           // non-blocking job submission (detached 
mode)

val jobStatus = env.getClusterClient().queryJobStatus(jobId)   // approach 1: 
query job status via ClusterClient

 val jobStatus = env.queryJobStatus(jobId)   // approach 2: query job status 
via ExecutionEnvironment.


And you can refer this for more details:
https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing




Xintong Song  于2019年6月28日周五 下午10:28写道:

Hi, Singh,
I don't think that should work. The -D or -yD parameters needs to be passed to 
the Flink start-up scripts or the "flink run" command. I don't think the 
IntelliJ VM arguments are equivalent to that. In fact, I'm not aware of any 
method to set "-D" parameters when running jobs IDE.

Thank you~

Xintong Song




On Fri, Jun 28, 2019 at 8:45 PM M Singh  wrote:

 Hi Xintong:
I passed the -Dparallelism.default=2 in the  run configuration VM arguments for 
IntelliJ.
So what I am looking for is a way to overwrite the config parameters which are 
defined in the flink-config.yaml file (parallelism.default is just an example) 
which would be picked up regardless of the env (eg: locally, on yarn or IDE).  
When I run the application in IDE (locally) with the above mentioned VM 
parameter, the StreamExecutionEnvironment.config does not show this value and 
the Flink UI shows configuration parameter parallelism as 8.  Is there any 
other place where I can see the parameter settings ?
Thanks.
On Thursday, June 27, 2019, 11:58:28 PM EDT, Xintong Song 
 wrote:  
 
 Could you provide some more details on how you run your job with -D options in 
IDE?




Thank you~

Xintong Song




On Fri, Jun 28, 2019 at 5:03 AM M Singh  wrote:

 Hi Xintong:  Thanks for your pointers.
I tried -Dparallelism.default=2 locally (in IDE) and it did not work.  Do you 
know if there is a common way that would work both for emr, locally and ide ?
Thanks again.
On Thursday, June 27, 2019, 12:03:06 AM EDT, Xintong Song 
 wrote:  
 
 Hi Singh,
You can use the environment variable "FLINK_CONF_DIR" to specify path to the 
directory of config files. You can also override config options with command 
line arguments prefixed -D (for yarn-session.sh) or -yD (for 'flink run' 
command).

Thank you~

Xintong Song




On Wed, Jun 26, 2019 at 9:13 PM M Singh  wrote:

Hi:
I have a single EMR cluster with Flink and want to run multiple applications on 
it with different flink configurations.  Is there a way to 
1. Pass the config file name for each application, or2. Overwrite the config 
parameters via command line arguments for the application.  This is similar to 
how we can overwrite the default parameters in spark
I searched the documents and have tried using ParameterTool with the config 
parameter names, but it has not worked as yet.
Thanks for your help.
Mans
  
  



-- 
Best Regards

Jeff Zhang  

Apache Flink - How to find the number of window instances in an application

2019-07-04 Thread M Singh
Hi:
I wanted to find out if there is a metric to find out the the number of global 
or non-global window instances in a Flink application.
Thanks
Mans


Apache Flink - Relation between stream time characteristic and timer triggers

2019-07-08 Thread M Singh
Hi:
I have a few questions about the stream time characteristics:
1. If the time characteristic is set to TimeCharacteristic.EventTime, but the 
timers in a processor or trigger is set using registerProcessingTimeTimer (or 
vice versa), then will that timer fire ?  
2.  Once the time character is set on the stream environment, and changed later 
in the application, which one is applied, the first one or the last one ?
3.  If the stream time characteristic is set to IngestionTime, then is there 
any adverse effect of assigning the timestamp using  
assignTimeStampAndWatermark to a stream later in the application ?
Thanks

Re: Apache Flink - Relation between stream time characteristic and timer triggers

2019-07-09 Thread M Singh
 Thanks Yun for your answers.
Does this mean that we can use processing and event timers (in processors or 
triggers) regardless of the time characteristic ?  Also, is possible to use 
both together and will they both fire at the appropriate watermarks for 
processing and event times ?  
Mans
On Tuesday, July 9, 2019, 12:18:30 AM EDT, Yun Gao  
wrote:  
 
 Hi,    For the three questions,  1. The processing time timer will be trigger. 
IMO you may think the processing time timer as in parallel with the event time 
timer. They are processed separately underlying. The processing time timer will 
be triggered according to the realistic time.  2. I'am not very clear on how to 
changed later in the application. Do you mean call 
`StreamExecutionEnvironment#setStreamTimeCharacteristics` multiple times ? If 
so, then the last call will take effect for all the operators before or after 
the last call, since the setting will only take effect in 
`StreamExecutionEnvironment#execute`.  3. 'assignTimeStampAndWatermark' will 
change the timestamp of the record. IMO you may think each record contains a 
timestamp field, and the filed is set when ingesting, but 
'assignTimeStampAndWatermark' will change the value of this field, so the 
following operators relying on the timestamp will see the updated value.
Best,Yun



--From:M Singh 
Send Time:2019 Jul. 9 (Tue.) 09:42To:User 
Subject:Apache Flink - Relation between stream time 
characteristic and timer triggers
Hi:
I have a few questions about the stream time characteristics:
1. If the time characteristic is set to TimeCharacteristic.EventTime, but the 
timers in a processor or trigger is set using registerProcessingTimeTimer (or 
vice versa), then will that timer fire ?  
2.  Once the time character is set on the stream environment, and changed later 
in the application, which one is applied, the first one or the last one ?
3.  If the stream time characteristic is set to IngestionTime, then is there 
any adverse effect of assigning the timestamp using  
assignTimeStampAndWatermark to a stream later in the application ?
Thanks

  

Apache Flink - Gauge implementation

2019-07-10 Thread M Singh
Hi:
I am working on a Flink application and need to collect the number of events of 
certain type/second (per unit of time) as a metric.  I was hoping to use 
Flink's metric framework with a gauge but did not find any implementation that 
will reset the count after it is reported.
The example: 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#gauge
 shows how to use a gauge but does not reset the gauge after the metrics 
framework has reported the value every at the specified interval.   So, unless 
it is reset by the application (after the call to getValue), it will continue 
to increment.  
I was wondering if there is any resettable gauge implementation available that 
integrates with the Flink metrics framework.
Please let me know if there are any advice/pointers.
Thanks
Mans





Re: Apache Flink - Relation between stream time characteristic and timer triggers

2019-07-10 Thread M Singh
 Thanks for your answer Xingcan.
Just to clarify - if the characteristic is set to IngestionTime or 
ProcessingTime, the event time triggers will be ignored and not fire.
Mans
On Tuesday, July 9, 2019, 04:32:00 PM EDT, Xingcan Cui  
wrote:  
 
 Yes, Mans. You can use both processing-time and event-time timers if you set 
the time characteristic to event-time. They'll be triggered by their own time 
semantics, separately. (actually there’s no watermark for processing time)
Cheers,Xingcan

On Jul 9, 2019, at 11:40 AM, M Singh  wrote:
 Thanks Yun for your answers.
Does this mean that we can use processing and event timers (in processors or 
triggers) regardless of the time characteristic ?  Also, is possible to use 
both together and will they both fire at the appropriate watermarks for 
processing and event times ?  
Mans
On Tuesday, July 9, 2019, 12:18:30 AM EDT, Yun Gao  
wrote:  
 
 Hi,    For the three questions,  1. The processing time timer will be trigger. 
IMO you may think the processing time timer as in parallel with the event time 
timer. They are processed separately underlying. The processing time timer will 
be triggered according to the realistic time.  2. I'am not very clear on how to 
changed later in the application. Do you mean call 
`StreamExecutionEnvironment#setStreamTimeCharacteristics` multiple times ? If 
so, then the last call will take effect for all the operators before or after 
the last call, since the setting will only take effect in 
`StreamExecutionEnvironment#execute`.  3. 'assignTimeStampAndWatermark' will 
change the timestamp of the record. IMO you may think each record contains a 
timestamp field, and the filed is set when ingesting, but 
'assignTimeStampAndWatermark' will change the value of this field, so the 
following operators relying on the timestamp will see the updated value.
Best,Yun



--From:M Singh 
Send Time:2019 Jul. 9 (Tue.) 09:42To:User 
Subject:Apache Flink - Relation between stream time 
characteristic and timer triggers
Hi:
I have a few questions about the stream time characteristics:
1. If the time characteristic is set to TimeCharacteristic.EventTime, but the 
timers in a processor or trigger is set using registerProcessingTimeTimer (or 
vice versa), then will that timer fire ?  
2.  Once the time character is set on the stream environment, and changed later 
in the application, which one is applied, the first one or the last one ?
3.  If the stream time characteristic is set to IngestionTime, then is there 
any adverse effect of assigning the timestamp using  
assignTimeStampAndWatermark to a stream later in the application ?
Thanks

  

  

Re: Apache Flink - Gauge implementation

2019-07-10 Thread M Singh
 Xintong/Chesnay - Thanks for your response.
>From what I understand meter measures average throughput 
>(https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#meter).
>  I would like to have the absolute count in each unit of interval.
Also, I am assuming that the metric call to getValue() is not synchronized with 
the processor/functions respective (process/map/etc) call.
Please let me know if I missed anything.
Thanks again.
Mans




On Wednesday, July 10, 2019, 04:21:15 AM EDT, Chesnay Schepler 
 wrote:  
 
  This can't be implemented as there's no guarantee that getValue is only 
called once. 
  Why do you want to reset the count? If you're interested in rates, why aren't 
you using a meter?
  
  On 10/07/2019 09:37, Xintong Song wrote:
  
 
Hi Singh, 
  Could your problem be solved by simply record the previous value and subtract 
it from the new value? 
   
Thank you~
 
Xintong Song
   
   
  On Wed, Jul 10, 2019 at 3:33 PM M Singh  wrote:
  
   Hi: 
  I am working on a Flink application and need to collect the number of events 
of certain type/second (per unit of time) as a metric.  I was hoping to use 
Flink's metric framework with a gauge but did not find any implementation that 
will reset the count after it is reported. 
  The example: 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#gauge
 shows how to use a gauge but does not reset the gauge after the metrics 
framework has reported the value every at the specified interval.   So, unless 
it is reset by the application (after the call to getValue), it will continue 
to increment.   
  I was wondering if there is any resettable gauge implementation available 
that integrates with the Flink metrics framework. 
  Please let me know if there are any advice/pointers. 
  Thanks 
  Mans
 
  
  
 
  

 
   

Re: Apache Flink - Relation between stream time characteristic and timer triggers

2019-07-11 Thread M Singh
 Thanks Fabian for your response.
Just to clarify then - regardless of the time characteristics, if a processor 
or window trigger registers with a ProcessingTime  and EventTime  timers - they 
will all fire when the appropriate watermarks arrive.
Thanks again.
On Thursday, July 11, 2019, 05:41:54 AM EDT, Fabian Hueske 
 wrote:  
 
 Hi Mans,
IngestionTime is uses the same internal mechanisms as EventTime (record 
timestamps and watermarks).

The difference is that instead of extracting a timestamp from the record (using 
a custom timestamp extractor & wm assigner), Flink will assign timestamps based 
on the machine clock of the machine that runs the source task and will also 
automatically generate watermarks. If you ask for my opinion, IngestionTime 
combines the disadvantages of ProcessingTime and EventTime. You pay the latency 
/ performance penalty of EventTime for the non-determinism of ProcessingTime.

So, if you enable IngestionTime, you can use EventTime timers and 
ProcessingTime timers.
Best, Fabian

Am Mi., 10. Juli 2019 um 09:37 Uhr schrieb M Singh :

 Thanks for your answer Xingcan.
Just to clarify - if the characteristic is set to IngestionTime or 
ProcessingTime, the event time triggers will be ignored and not fire.
Mans
On Tuesday, July 9, 2019, 04:32:00 PM EDT, Xingcan Cui  
wrote:  
 
 Yes, Mans. You can use both processing-time and event-time timers if you set 
the time characteristic to event-time. They'll be triggered by their own time 
semantics, separately. (actually there’s no watermark for processing time)
Cheers,Xingcan

On Jul 9, 2019, at 11:40 AM, M Singh  wrote:
 Thanks Yun for your answers.
Does this mean that we can use processing and event timers (in processors or 
triggers) regardless of the time characteristic ?  Also, is possible to use 
both together and will they both fire at the appropriate watermarks for 
processing and event times ?  
Mans
On Tuesday, July 9, 2019, 12:18:30 AM EDT, Yun Gao  
wrote:  
 
 Hi,    For the three questions,  1. The processing time timer will be trigger. 
IMO you may think the processing time timer as in parallel with the event time 
timer. They are processed separately underlying. The processing time timer will 
be triggered according to the realistic time.  2. I'am not very clear on how to 
changed later in the application. Do you mean call 
`StreamExecutionEnvironment#setStreamTimeCharacteristics` multiple times ? If 
so, then the last call will take effect for all the operators before or after 
the last call, since the setting will only take effect in 
`StreamExecutionEnvironment#execute`.  3. 'assignTimeStampAndWatermark' will 
change the timestamp of the record. IMO you may think each record contains a 
timestamp field, and the filed is set when ingesting, but 
'assignTimeStampAndWatermark' will change the value of this field, so the 
following operators relying on the timestamp will see the updated value.
Best,Yun



--From:M Singh 
Send Time:2019 Jul. 9 (Tue.) 09:42To:User 
Subject:Apache Flink - Relation between stream time 
characteristic and timer triggers
Hi:
I have a few questions about the stream time characteristics:
1. If the time characteristic is set to TimeCharacteristic.EventTime, but the 
timers in a processor or trigger is set using registerProcessingTimeTimer (or 
vice versa), then will that timer fire ?  
2.  Once the time character is set on the stream environment, and changed later 
in the application, which one is applied, the first one or the last one ?
3.  If the stream time characteristic is set to IngestionTime, then is there 
any adverse effect of assigning the timestamp using  
assignTimeStampAndWatermark to a stream later in the application ?
Thanks

  

  
  

Re: Apache Flink - Relation between stream time characteristic and timer triggers

2019-07-11 Thread M Singh
 Thanks Fabian/Xingcan/Yun for all your help.  Mans
On Thursday, July 11, 2019, 11:46:42 AM EDT, Fabian Hueske 
 wrote:  
 
 Hi,
ProcessingTime timers are always supportedEventTime timers are only supported 
for EventTime and IngestionTime
Best, Fabian

Am Do., 11. Juli 2019 um 17:44 Uhr schrieb M Singh :

 Thanks Fabian for your response.
Just to clarify then - regardless of the time characteristics, if a processor 
or window trigger registers with a ProcessingTime  and EventTime  timers - they 
will all fire when the appropriate watermarks arrive.
Thanks again.
On Thursday, July 11, 2019, 05:41:54 AM EDT, Fabian Hueske 
 wrote:  
 
 Hi Mans,
IngestionTime is uses the same internal mechanisms as EventTime (record 
timestamps and watermarks).

The difference is that instead of extracting a timestamp from the record (using 
a custom timestamp extractor & wm assigner), Flink will assign timestamps based 
on the machine clock of the machine that runs the source task and will also 
automatically generate watermarks. If you ask for my opinion, IngestionTime 
combines the disadvantages of ProcessingTime and EventTime. You pay the latency 
/ performance penalty of EventTime for the non-determinism of ProcessingTime.

So, if you enable IngestionTime, you can use EventTime timers and 
ProcessingTime timers.
Best, Fabian

Am Mi., 10. Juli 2019 um 09:37 Uhr schrieb M Singh :

 Thanks for your answer Xingcan.
Just to clarify - if the characteristic is set to IngestionTime or 
ProcessingTime, the event time triggers will be ignored and not fire.
Mans
On Tuesday, July 9, 2019, 04:32:00 PM EDT, Xingcan Cui  
wrote:  
 
 Yes, Mans. You can use both processing-time and event-time timers if you set 
the time characteristic to event-time. They'll be triggered by their own time 
semantics, separately. (actually there’s no watermark for processing time)
Cheers,Xingcan

On Jul 9, 2019, at 11:40 AM, M Singh  wrote:
 Thanks Yun for your answers.
Does this mean that we can use processing and event timers (in processors or 
triggers) regardless of the time characteristic ?  Also, is possible to use 
both together and will they both fire at the appropriate watermarks for 
processing and event times ?  
Mans
On Tuesday, July 9, 2019, 12:18:30 AM EDT, Yun Gao  
wrote:  
 
 Hi,    For the three questions,  1. The processing time timer will be trigger. 
IMO you may think the processing time timer as in parallel with the event time 
timer. They are processed separately underlying. The processing time timer will 
be triggered according to the realistic time.  2. I'am not very clear on how to 
changed later in the application. Do you mean call 
`StreamExecutionEnvironment#setStreamTimeCharacteristics` multiple times ? If 
so, then the last call will take effect for all the operators before or after 
the last call, since the setting will only take effect in 
`StreamExecutionEnvironment#execute`.  3. 'assignTimeStampAndWatermark' will 
change the timestamp of the record. IMO you may think each record contains a 
timestamp field, and the filed is set when ingesting, but 
'assignTimeStampAndWatermark' will change the value of this field, so the 
following operators relying on the timestamp will see the updated value.
Best,Yun



--From:M Singh 
Send Time:2019 Jul. 9 (Tue.) 09:42To:User 
Subject:Apache Flink - Relation between stream time 
characteristic and timer triggers
Hi:
I have a few questions about the stream time characteristics:
1. If the time characteristic is set to TimeCharacteristic.EventTime, but the 
timers in a processor or trigger is set using registerProcessingTimeTimer (or 
vice versa), then will that timer fire ?  
2.  Once the time character is set on the stream environment, and changed later 
in the application, which one is applied, the first one or the last one ?
3.  If the stream time characteristic is set to IngestionTime, then is there 
any adverse effect of assigning the timestamp using  
assignTimeStampAndWatermark to a stream later in the application ?
Thanks

  

  
  
  

Apache Flink - Side output time semantics for DataStream

2019-07-13 Thread M Singh
Hi:
I wanted to find out what is the timestamp associated with the elements of a 
stream side output with different stream time characteristics.
Thanks
Man 

  1   2   >