Re: Flink Job cluster in HA mode - recovery vs upgrade

2020-08-22 Thread Alexey Trenikhun
Since it is necessary to use cancel with save point/resume from save point, 
then it is not possible to use Deployment (otherwise JobManager pod will 
restart on crash from same save point), so we need to use Job, but in that case 
if Job pod is crashed who will start new instance of Job pod ? Sounds like 
currently HA with kubernetes is not achievable unless some controller is used 
to manage JobManager. Am I right?


From: Chesnay Schepler 
Sent: Saturday, August 22, 2020 12:58 AM
To: Alexey Trenikhun ; Piotr Nowojski 
Cc: Flink User Mail List 
Subject: Re: Flink Job cluster in HA mode - recovery vs upgrade

If, and only if, the cluster-id and JobId are identical then the JobGraph will 
be recovered from ZooKeeper.

On 22/08/2020 06:12, Alexey Trenikhun wrote:
Not sure I that I understand your statement about "the HaServices are only 
being given the JobGraph", seems HighAvailabilityServices#getJobGraphStore 
provides JobGraphStore, and potentially implementation of 
JobGraphStore#recoverJobGraph(JobID jobId) for this store could build new graph 
for jar rather than read stored graph from ZooKeeper?

Also, if there is single job with same job-id (job cluster), jobgraph of failed 
job will be over written by new one which will have same job-id?


From: Chesnay Schepler 
Sent: Friday, August 21, 2020 12:16 PM
To: Alexey Trenikhun ; Piotr Nowojski 

Cc: Flink User Mail List 
Subject: Re: Flink Job cluster in HA mode - recovery vs upgrade

The HaServices are only being given the JobGraph, to this is not possible.

Actually I have to correct myself. For a job cluster the state in HA should be 
irrelevant when you're submitting another jar.
Flink has no way of knowing that this jar is in any way connected to the 
previous job; they will be treated as separate things.

However, you will likely end up with stale data in zookeeper (the jobgraph of 
the failed job).

On 21/08/2020 17:51, Alexey Trenikhun wrote:
Is it feasible to override ZooKeeperHaServices to recreate JobGraph from jar 
instead of reading it from ZK state. Any hints? I have feeling that reading 
JobGraph from jar is more resilient approach, less chances of mistakes during 
upgrade

Thanks,
Alexey


From: Piotr Nowojski 
Sent: Thursday, August 20, 2020 7:04 AM
To: Chesnay Schepler 
Cc: Alexey Trenikhun ; Flink User Mail 
List 
Subject: Re: Flink Job cluster in HA mode - recovery vs upgrade

Thank you for the clarification Chesney and sorry for the incorrect previous 
answer.

Piotrek

czw., 20 sie 2020 o 15:59 Chesnay Schepler 
mailto:ches...@apache.org>> napisał(a):
This is incorrect; we do store the JobGraph in ZooKeeper. If you just delete 
the deployment the cluster will recover the previous JobGraph (assuming you 
aren't changing the Zookeeper configuration).

If you wish to update the job, then you should cancel it (along with creating a 
savepoint), which will clear the Zookeeper state, and then create a new 
deployment

On 20/08/2020 15:43, Piotr Nowojski wrote:
Hi Alexey,

I might be wrong (I don't know this side of Flink very well), but as far as I 
know JobGraph is never stored in the ZK. It's always recreated from the job's 
JAR. So you should be able to upgrade the job by replacing the JAR with a newer 
version, as long as the operator UIDs are the same before and after the upgrade 
(for operator state to match before and after the upgrade).

Best, Piotrek

czw., 20 sie 2020 o 06:34 Alexey Trenikhun 
mailto:yen...@msn.com>> napisał(a):
Hello,

Let's say I run Flink Job cluster with persistent storage and Zookeeper HA on 
k8s with single  JobManager and use externalized checkpoints. When JM crashes, 
k8s will restart JM pod, and JM will read JobId and JobGraph from ZK and 
restore from latest checkpoint. Now let's say I want to upgrade job binary, I 
delete deployments, create new deployments referring to newer image, will JM 
still read JobGraph from ZK or will create new one from new job jar?

Thanks,
Alexey





Re: SDK vs Connectors

2020-08-22 Thread Yun Gao
Hi Prasanna,

   1) Semantically both a) and b) would be Ok. If the Custom sink could be 
chained with the map operator (I assume the map operator is the "Processing" in 
the graph), there should be also no much difference physically, if they could 
not chain, then writting a custom sink would cause another pass of network 
transferring, but the custom sink would be run in a different thread, thus much 
more computation resources could be exploited. 
   2) To achieve at-least-once, you need to implment the "CheckpointedFunction" 
interface, and ensures flushing all the data to the outside systems when 
snapshotting states. Since if the checkpointing succeed, the previous data will 
not be replayed after failover, thus these pieces of data need to be ensured 
written out before the checkpoint succeeds.
   3) From my side I don't think there are significant disadvantages of writing 
custom sink functions. 

Best,
 Yun


--
Sender:Prasanna kumar
Date:2020/08/22 02:00:51
Recipient:user; 
Theme:SDK vs Connectors

Hi Team,

Following is the pipeline 
Kafka => Processing => SNS Topics .

Flink Does not provide a SNS connector out of the box. 

a) I implemented the above by using AWS SDK and published the messages in the 
Map operator itself.  
The pipeline is working well. I see messages flowing to SNS topics.

b) Another approach is that I could write a custom sink function and still 
publish to SNS using SDK in this stage. 

Questions
1) What would be the primary difference between approach a) and b). Is there 
any significant advantage of one over the other ?

2) Would at least once guarantee be confirmed if we follow the above approach?

3) Would there be any significant disadvantages(rather what we need to be 
careful ) of writing our custom sink functions ?

Thanks,
Prasanna. 


Re: Flink checkpointing with Azure block storage

2020-08-22 Thread Boris Lublinsky
Thanks Yun,
I make it work, but now I want to set appropriate config programmatically.
I can set state.checkpointing.dir by:

val fsStateBackend = new FsStateBackend(new 
URI("wasb://@$.blob.core.windows.net/"))
env.setStateBackend(fsStateBackend.asInstanceOf[StateBackend])

But, I can’t update configuration to add credentials 
fs.azure.account.key..blob.core.windows.net: 
Because getConfiguration is a private method. Any suggestions?




> On Aug 20, 2020, at 9:29 PM, Yun Tang  wrote:
> 
> Hi Boris
> 
> I think the official guide [1] should be enough to tell you how to configure.
> However, I think your changes to flink-conf.ymal might not take effect as you 
> have configured the state backend as 'filesystem' while logs still tell us 
> that "No state backend has been configured, using default (Memory / 
> JobManager) MemoryStateBackend".
> 
> You can view the log to see whether your changes printed to search for 
> "Loading configuration property".
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/azure.html#credentials-configuration
>  
> 
> 
> Best
> Yun Tang
> 
> From: Boris Lublinsky 
> Sent: Friday, August 21, 2020 7:18
> To: user 
> Subject: Re: Flink checkpointing with Azure block storage
>  
> To test it, I created flink-conf.yaml file and put it in resource directory 
> of my project
> The file contains the following:
> 
> #==
> # Fault tolerance and checkpointing
> #==
> 
> # The backend that will be used to store operator state checkpoints if
> # checkpointing is enabled.
> #
> # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
> # .
> #
> state.backend: filesystem
> 
> # Directory for checkpoints filesystem, when using any of the default bundled
> # state backends.
> #
> state.checkpoints.dir: 
> wasb://@$.blob.core.windows.net 
> /
> 
> fs.azure.account.key..blob.core.windows.net 
> : 
> 
> # Default target directory for savepoints, optional.
> #
> # state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints 
> 
> 
> # Flag to enable/disable incremental checkpoints for backends that
> 
> Which should of produce error,
> 
> But what I see is that it does not seen to take effect:
> 
> 
> 313 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.jobmaster.JobMaster  - No state backend has been 
> configured, using default (Memory / JobManager) MemoryStateBackend (data in 
> heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 
> 'null', asynchronous: TRUE, maxStateSize: 5242880)
> 3327 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.jobmaster.JobMaster  - Using failover strategy 
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5ef4eacb
>  for Flink Streaming Job (427dae12e8f7243742ae8bd152467edc).
> 3329 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService
>   - Proposing leadership to contender akka://flink/user/rpc/jobmanager_3 
> 
> 
> 
>> On Aug 20, 2020, at 5:14 PM, Boris Lublinsky > > wrote:
>> 
>> Is there somewhere a complete configuration example for such option?



Ververica Flink training resources

2020-08-22 Thread Piper Piper
Hi Flink community,

I have two questions regarding the Ververica Flink Training resources.

1. In the official Flink documentation, the hyperlinks to the github sites
for the exercises in the "Learn Flink" section are not working. If
possible, please provide me with the correct links for the exercises.

2. The schema of the Taxi Fares dataset matches with the old dataset
(nycTaxiFares.gz). However, the schema of the Taxi Ride dataset given in
the Ververica github site does not seem to match the dataset in the old
file (nycTaxiRides.gz). Please advise.

Given Schema: rideId, taxiId, driverId, isStart, startTime, endTime,
startLon, startLat, endLon, endLat, passengerCnt

nycTaxiRides.gz sample line (after extracting to file
nycTaxiRides4): 6,START,2013-01-01 00:00:00,1970-01-01
00:00:00,-73.866135,40.771091,-73.961334,40.764912,6,201306,201306

Thank you!

Piper


Re: OpenJDK or Oracle JDK: 8 or 11?

2020-08-22 Thread Pankaj Chand
Thank you, Arvid and Marco!

On Sat, Aug 22, 2020 at 2:03 PM Marco Villalobos 
wrote:

> Hi Pankaj,
>
> I highly recommend that you use an OpenJDK version 11 because each JDK
> upgrade has a performance improvement, and also because the Oracle JDK and
> OpenJDK are based off the same code-base. The main difference between
> Oracle and OpenJDK is the branding and price.
>
>
> > On Aug 22, 2020, at 4:23 AM, Pankaj Chand 
> wrote:
> >
> > Hello,
> >
> > The documentation says that to run Flink, we need Java 8 or 11.
> >
> > Will JDK 11 work for running Flink, programming Flink applications as
> well as building Flink from source?
> >
> > Also, can we use Open JDK for the above three capabilities, or do any of
> the capabilities require Oracle JDK?
> >
> > Thanks,
> >
> > Pankaj
>
>


Re: OpenJDK or Oracle JDK: 8 or 11?

2020-08-22 Thread Marco Villalobos
Hi Pankaj,

I highly recommend that you use an OpenJDK version 11 because each JDK upgrade 
has a performance improvement, and also because the Oracle JDK and OpenJDK are 
based off the same code-base. The main difference between Oracle and OpenJDK is 
the branding and price.


> On Aug 22, 2020, at 4:23 AM, Pankaj Chand  wrote:
> 
> Hello,
> 
> The documentation says that to run Flink, we need Java 8 or 11.
> 
> Will JDK 11 work for running Flink, programming Flink applications as well as 
> building Flink from source?
> 
> Also, can we use Open JDK for the above three capabilities, or do any of the 
> capabilities require Oracle JDK?
> 
> Thanks,
> 
> Pankaj



Re: OpenJDK or Oracle JDK: 8 or 11?

2020-08-22 Thread Arvid Heise
Hi Pankaj,

Yes, you can use all 4 mentioned JDKs for these three things.

When building Flink from sources with Java 11, you may need to activate the
java 11 profile in maven (-Pjava11). If you just want to use Flink with
Java 11, you can also use Flink built with Java 8 (in fact the official
maven artifacts and dist are still built in Java 8).

Best,

Arvid

On Sat, Aug 22, 2020 at 1:23 PM Pankaj Chand 
wrote:

> Hello,
>
> The documentation says that to run Flink, we need Java 8 or 11.
>
> Will JDK 11 work for running Flink, programming Flink applications as well
> as building Flink from source?
>
> Also, can we use Open JDK for the above three capabilities, or do any of
> the capabilities require Oracle JDK?
>
> Thanks,
>
> Pankaj
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


OpenJDK or Oracle JDK: 8 or 11?

2020-08-22 Thread Pankaj Chand
Hello,

The documentation says that to run Flink, we need Java 8 or 11.

Will JDK 11 work for running Flink, programming Flink applications as well
as building Flink from source?

Also, can we use Open JDK for the above three capabilities, or do any of
the capabilities require Oracle JDK?

Thanks,

Pankaj


Re: Flink Job cluster in HA mode - recovery vs upgrade

2020-08-22 Thread Chesnay Schepler
If, and only if, the cluster-id and JobId are identical then the 
JobGraph will be recovered from ZooKeeper.


On 22/08/2020 06:12, Alexey Trenikhun wrote:
Not sure I that I understand your statement about "the HaServices are 
only being given the JobGraph", seems 
HighAvailabilityServices#getJobGraphStore provides JobGraphStore, and 
potentially implementation of JobGraphStore#recoverJobGraph(JobID 
jobId) for this store could build new graph for jar rather than read 
stored graph from ZooKeeper?


Also, if there is single job with same job-id (job cluster), jobgraph 
of failed job will be over written by new one which will have same job-id?



*From:* Chesnay Schepler 
*Sent:* Friday, August 21, 2020 12:16 PM
*To:* Alexey Trenikhun ; Piotr Nowojski 


*Cc:* Flink User Mail List 
*Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
The HaServices are only being given the JobGraph, to this is not possible.

Actually I have to correct myself. For a job cluster the state in HA 
should be irrelevant when you're submitting another jar.
Flink has no way of knowing that this jar is in any way connected to 
the previous job; they will be treated as separate things.


However, you will likely end up with stale data in zookeeper (the 
jobgraph of the failed job).


On 21/08/2020 17:51, Alexey Trenikhun wrote:
Is it feasible to override ZooKeeperHaServices to recreate JobGraph 
from jar instead of reading it from ZK state. Any hints? I have 
feeling that reading JobGraph from jar is more resilient approach, 
less chances of mistakes during upgrade


Thanks,
Alexey


*From:* Piotr Nowojski  


*Sent:* Thursday, August 20, 2020 7:04 AM
*To:* Chesnay Schepler  
*Cc:* Alexey Trenikhun  ; 
Flink User Mail List  


*Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
Thank you for the clarification Chesney and sorry for the incorrect 
previous answer.


Piotrek

czw., 20 sie 2020 o 15:59 Chesnay Schepler > napisał(a):


This is incorrect; we do store the JobGraph in ZooKeeper. If you
just delete the deployment the cluster will recover the previous
JobGraph (assuming you aren't changing the Zookeeper configuration).

If you wish to update the job, then you should cancel it (along
with creating a savepoint), which will clear the Zookeeper state,
and then create a new deployment

On 20/08/2020 15:43, Piotr Nowojski wrote:

Hi Alexey,

I might be wrong (I don't know this side of Flink very well),
but as far as I know JobGraph is never stored in the ZK. It's
always recreated from the job's JAR. So you should be able to
upgrade the job by replacing the JAR with a newer version, as
long as the operator UIDs are the same before and after the
upgrade (for operator state to match before and after the upgrade).

Best, Piotrek

czw., 20 sie 2020 o 06:34 Alexey Trenikhun mailto:yen...@msn.com>> napisał(a):

Hello,

Let's say I run Flink Job cluster with persistent storage
and Zookeeper HA on k8s with single  JobManager and use
externalized checkpoints. When JM crashes, k8s will restart
JM pod, and JM will read JobId and JobGraph from ZK and
restore from latest checkpoint. Now let's say I want to
upgrade job binary, I delete deployments, create new
deployments referring to newer image, will JM still read
JobGraph from ZK or will create new one from new job jar?

Thanks,
Alexey