Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-30 Thread SHI Xiaogang
Hi

Thanks for bringing this.

The design looks very nice to me in that
1. In the new per-job mode, we don't need to compile user programs in the
client and can directly run user programs with user jars. That way, it's
easier for resource isolation in multi-tenant platforms and is much safer.
2. The execution of user programs can be unified in session and per-job
modes. In session mode, user jobs are submitted via a remote ClusterClient
while in per-job mode user jobs are submitted via a local ClusterClient.

Regards,
Xiaogang

tison  于2019年10月30日周三 下午3:30写道:

> (CC user list because I think users may have ideas on how per-job mode
> should look like)
>
> Hi all,
>
> In the discussion about Flink on k8s[1] we encounter a problem that
> opinions
> diverge in how so-called per-job mode works. This thread is aimed at
> stating
> a dedicated discussion about per-job semantic and how to implement it.
>
> **The AS IS per-job mode**
>
> * in standalone deployment, we bundle user jar with Flink jar, retrieve
> JobGraph which is the very first JobGraph from user program in classpath,
> and then start a Dispatcher with this JobGraph preconfigured, which
> launches it as "recovered" job.
>
> * in YARN deployment, we accept submission via CliFrontend, extract
> JobGraph
> which is the very first JobGraph from user program submitted, serialize
> the JobGraph and upload it to YARN as resource, and then when AM starts,
> retrieve the JobGraph as resource and start Dispatcher with this JobGraph
> preconfigured, follows are the same.
>
> Specifically, in order to support multiple parts job, if YARN deployment
> configured as "attached", it starts a SessionCluster, proceeds the progress
> and shutdown the cluster on job finished.
>
> **Motivation**
>
> The implementation mentioned above, however, suffers from problems. The
> major
> two of them are 1. only respect the very first JobGraph from user program
> 2.
> compile job in client side
>
> 1. Only respect the very first JobGraph from user program
>
> There is already issue about this topic[2]. As we extract JobGraph from
> user
> program by hijacking Environment#execute we actually abort any execution
> after the first call to #execute. Besides it surprises users many times
> that
> any logic they write in the program is possibly never executed, here the
> problem is that the semantic of "job" from Flink perspective. I'd like to
> say
> in current implementation "per-job" is actually "per-job-graph". However,
> in practices since we support jar submission it is "per-program" semantic
> wanted.
>
> 2. Compile job in client side
>
> Well, standalone deployment is not in the case. But in YARN deployment, we
> compile job and get JobGraph in client side, and then upload it to YARN.
> This approach, however, somehow breaks isolation. We have observed that
> user
> program contains exception handling logic which call System.exit in main
> method, which causes a compilation of the job exit the whole client at
> once.
> It is a critical problem if we manage multiple Flink job in a unique
> platform.
> In this case, it shut down the whole service.
>
> Besides there are many times I was asked why per-job mode doesn't run
> "just like" session mode but with a dedicated cluster. It might imply that
> current implementation mismatches users' demand.
>
> **Proposal**
>
> In order to provide a "per-program" semantic mode which acts "just like"
> session
> mode but with a dedicated cluster, I propose a workflow as below. It acts
> like
> starting a drive on cluster but is not a general driver solution as
> proposed
> here[3], the main purpose of the workflow below is for providing a
> "per-program"
> semantic mode.
>
> *From CliFrontend*
>
> 1. CliFrontend receives submission, gathers all configuration and starts a
> corresponding ClusterDescriptor.
>
> 2. ClusterDescriptor deploys a cluster with main class
> ProgramClusterEntrypoint
> while shipping resources including user program.
>
> 3. ProgramClusterEntrypoint#main contains logic starting components
> including
> Standalone Dispatcher, configuring user program to start a
> RpcClusterClient,
> and then invoking main method of user program.
>
> 4. RpcClusterClient acts like MiniClusterClient which is able to submit the
> JobGraph after leader elected so that we don't fallback to round-robin or
> fail submission due to no leader.
>
> 5. Whether or not deliver job result depends on user program logic, since
> we
> can already get a JobClient from execute. ProgramClusterEntrypoint exits on
> user program exits and all jobs submitted globally terminate.
>
> This way fits in the direction of FLIP-73 because strategy starting a
> RpcClusterClient can be regarded as a special Executor. After
> ProgramClusterEntrypoint#main starts a Cluster, it generates and passes
> configuration to
> user program so that when Executor generated, it knows to use a
> RpcClusterClient
> for submission and the address of Dispatcher.
>
> **Compatibility**
>
> 

Re: [ANNOUNCE] Zili Chen becomes a Flink committer

2019-09-11 Thread SHI Xiaogang
Congratulations!

Regards,
Xiaogang

Guowei Ma  于2019年9月11日周三 下午7:07写道:

> Congratulations Zili !
>
> Best,
> Guowei
>
>
> Fabian Hueske  于2019年9月11日周三 下午7:02写道:
>
>> Congrats Zili Chen :-)
>>
>> Cheers, Fabian
>>
>> Am Mi., 11. Sept. 2019 um 12:48 Uhr schrieb Biao Liu > >:
>>
>>> Congrats Zili!
>>>
>>> Thanks,
>>> Biao /'bɪ.aʊ/
>>>
>>>
>>>
>>> On Wed, 11 Sep 2019 at 18:43, Oytun Tez  wrote:
>>>
 Congratulations!

 ---
 Oytun Tez

 *M O T A W O R D*
 The World's Fastest Human Translation Platform.
 oy...@motaword.com — www.motaword.com


 On Wed, Sep 11, 2019 at 6:36 AM bupt_ljy  wrote:

> Congratulations!
>
>
> Best,
>
> Jiayi Liao
>
>  Original Message
> *Sender:* Till Rohrmann
> *Recipient:* dev; user
> *Date:* Wednesday, Sep 11, 2019 17:22
> *Subject:* [ANNOUNCE] Zili Chen becomes a Flink committer
>
> Hi everyone,
>
> I'm very happy to announce that Zili Chen (some of you might also
> know him as Tison Kun) accepted the offer of the Flink PMC to become a
> committer of the Flink project.
>
> Zili Chen has been an active community member for almost 16 months
> now. He helped pushing the Flip-6 effort over the finish line, ported a 
> lot
> of legacy code tests, removed a good part of the legacy code, contributed
> numerous fixes, is involved in the Flink's client API refactoring, drives
> the refactoring of Flink's HighAvailabilityServices and much more. Zili
> Chen also helped the community by PR reviews, reporting Flink issues,
> answering user mails and being very active on the dev mailing list.
>
> Congratulations Zili Chen!
>
> Best, Till
> (on behalf of the Flink PMC)
>



Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-08-30 Thread SHI Xiaogang
Hi Dadashov,

You may have a look at method YarnResourceManager#onContainersAllocated
which will launch containers (via NMClient#startContainer) after containers
are allocated.
The launching is performed in the main thread of YarnResourceManager and
the launching is synchronous/blocking. Consequently, the containers will be
launched one by one.

Regards,
Xiaogang

Elkhan Dadashov  于2019年8月31日周六 上午2:37写道:

> Thanks  everyone for valuable input and sharing  your experience for
> tackling the issue.
>
> Regarding suggestions :
> - We provision some common jars in all cluster nodes  *-->*  but this
> requires dependence on Infra Team schedule for handling common jars/updating
> - Making Uberjar slimmer *-->* tried even with 200 MB Uberjar (half
> size),  did not improve much. Only 100 containers could started in time.
> but then receiving :
>
> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
> start container.
> This token is expired. current time is 1566422713305 found 1566422560552
> Note: System times on machines may be out of sync. Check system time and time 
> zones.
>
>
> - It would be nice to see FLINK-13184
> <https://issues.apache.org/jira/browse/FLINK-13184> , but expected
> version that will get in is 1.10
> - Increase replication factor --> It would be nice to have Flink conf for
> setting replication factor for only Fink job jars, but not the output. It
> is also challenging to set a replication for yet non-existing directory,
> the new files will have default replication factor. Will explore HDFS cache
> option.
>
> Maybe another option can be:
> - Letting yet-to-be-started Task Managers (or NodeManagers) download the
> jars from already started TaskManagers  in P2P fashion, not to have a
> blocker on HDFS replication.
>
> Spark job without any tuning exact same size jar with 800 executors, can
> start without any issue at the same cluster in less than a minute.
>
> *Further questions:*
>
> *@ SHI Xiaogang > :*
>
> I see that all 800 requests are sent concurrently :
>
> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>  org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
> container with resources . Number pending requests
> 793.
> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>  org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} for job
> e908cb4700d5127a0b67be035e4494f7 with allocation id
> AllocationID{cb016f7ce1eac1342001ccdb1427ba07}.
>
> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>  org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
> container with resources . Number pending requests
> 794.
> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>  org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} for job
> e908cb4700d5127a0b67be035e4494f7 with allocation id
> AllocationID{71bbb917374ade66df4c058c41b81f4e}.
> ...
>
> Can you please elaborate the part  "As containers are launched and stopped
> one after another" ? Any pointer to class/method in Flink?
>
> *@ Zhu Zhu > *:
>
> Regarding "One optimization that we take is letting yarn to reuse the
> flink-dist jar which was localized when running previous jobs."
>
> We are intending to use Flink Real-time pipeline for Replay from Hive/HDFS
> (from offline source), to have 1 single pipeline for both batch and
> real-time. So for batch Flink job, the containers will be released once the
> job is done.
> I guess your job is real-time flink, so  you can share the  jars from
> already long-running jobs.
>
> Thanks.
>
>
> On Fri, Aug 30, 2019 at 12:46 AM Jeff Zhang  wrote:
>
>> I can think of 2 approaches:
>>
>> 1. Allow flink to specify the replication of the submitted uber jar.
>> 2. Allow flink to specify config flink.yarn.lib which is all the flink
>> related jars that are hosted on hdfs. This way users don't need to build
>> and submit a fat uber jar every time. And those flink jars hosted on hdfs
>> can also be specify replication separately.
>>
>>
>>
>> Till Rohrmann  于2019年8月30日周五 下午3:33写道:
>>
>>> For point 2. there exists already a JIRA issue [1] and a PR. I hope that
>>> we can merge it during this release cycle.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-13184
>>>
>>> Cheers,
>>> Till
&g

Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-08-29 Thread SHI Xiaogang
Hi Datashov,

We faced similar problems in our production clusters.

Now both lauching and stopping of containers are performed in the main
thread of YarnResourceManager. As containers are launched and stopped one
after another, it usually takes long time to boostrap large jobs. Things
get worse when some node managers get lost. Yarn will retry many times to
communicate with them, leading to heartbeat timeout of TaskManagers.

Following are some efforts we made to help Flink deal with large jobs.

1. We provision some common jars in all cluster nodes and ask our users not
to include these jars in their uberjar. When containers bootstrap, these
jars are added to the classpath via JVM options. That way, we can
efficiently reduce the size of uberjars.

2. We deploys some asynchronous threads to launch and stop containers in
YarnResourceManager. The bootstrap time can be efficiently  reduced when
launching a large amount of containers. We'd like to contribute it to the
community very soon.

3. We deploys a timeout timer for each launching container. If a task
manager does not register in time after its container has been launched, a
new container will be allocated and launched. That will lead to certain
waste of resources, but can reduce the effects caused by slow or
problematic nodes.

Now the community is considering the refactoring of ResourceManager. I
think it will be the time for improving its efficiency.

Regards,
Xiaogang

Elkhan Dadashov  于2019年8月30日周五 上午7:10写道:

> Dear Flink developers,
>
> Having  difficulty of getting  a Flink job started.
>
> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
> containers.
>
> The default HDFS replication is 3.
>
> *The Yarn queue is empty, and 800 containers  are allocated
> almost immediately  by Yarn  RM.*
>
> It takes very long time until all 800 nodes (node managers) will download
> Uberjar from HDFS to local machines.
>
> *Q1:*
>
> a)  Do all those 800 nodes download of batch of  3  at a time  ? ( batch
> size = HDFS replication size)
>
> b) Or Do Flink TM's can replicate from each other  ? or  already started
> TM's replicate  to  yet-started  nodes?
>
> Most probably answer is (a), but  want to confirm.
>
> *Q2:*
>
> What  is the recommended way of handling  400MB+ Uberjar with 800+
> containers ?
>
> Any specific params to tune?
>
> Thanks.
>
> Because downloading the UberJar takes really   long time, after around 15
> minutes since the job kicked, facing this exception:
>
> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
> start container.
> This token is expired. current time is 1567116179193 found 1567116001610
> Note: System times on machines may be out of sync. Check system time and time 
> zones.
>   at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown 
> Source)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
>   at 
> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>   at 
> org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
>   at 
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>


Re: S3 recovery and checkpoint directories exhibit explosive growth

2017-07-16 Thread SHI Xiaogang
Hi Prashantnayak

Thanks a lot for reporting this problem. Can you provide more details to
address it?

I am guessing master has to delete too many files when a checkpoint is
subsumed, which is very common in our cases. The number of files in the
recovery directory will increase if the master cannot delete these files in
time. It usually happens when the checkpoint interval is very small and the
degree of parallelism is very large.

Regards,
Xiaogang


2017-07-15 0:31 GMT+08:00 Stephan Ewen :

> Hi!
>
> I am looping in Stefan and Xiaogang who worked a lot in incremental
> checkpointing.
>
> Some background on incremental checkpoints: Incremental checkpoints store
> "pieces" of the state (RocksDB ssTables) that are shared between
> checkpoints. Hence it naturally uses more files than no-incremental
> checkpoints.
>
> You could help us understand this with a few more details:
>   - Does it only occur with incremental checkpoints, or also with regular
> checkpoints?
>   - How many checkpoints to you retain?
>   - Do you use externalized checkpoints?
>   - Do you use a highly-available setup with ZooKeeper?
>
> Thanks,
> Stephan
>
>
>
> On Thu, Jul 13, 2017 at 10:43 PM, prashantnayak <
> prash...@intellifylearning.com> wrote:
>
>>
>> To add one more data point... it seems like the recovery directory is the
>> bottleneck somehow..  so if we delete the recovery directory and restart
>> the
>> job manager - it comes back and is responsive.
>>
>> Of course, we lose all jobs, since none can be recovered... and that is of
>> course not ideal.
>>
>> So the question seems to be why the recovery directory grows exponentially
>> in the first place.
>>
>> I can't imagine we're the only ones to see this... or we must be
>> configuring
>> something wrong while testing Flink 1.3.1
>>
>> Thanks for your help in advance
>>
>> Prashant
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/S3-recovery-and-che
>> ckpoint-directories-exhibit-explosive-growth-tp14270p14271.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>


Re: Checkpointing with RocksDB as statebackend

2017-06-28 Thread SHI Xiaogang
Hi Vinay,

We observed a similar problem before. We found that RocksDB keeps a lot of
index and filter blocks in memory. With the growth in state size (in our
cases, most states are only cleared in windowed streams), these blocks will
occupy much more memory.

We now let RocksDB put these blocks into block cache (via
setCacheIndexAndFilterBlocks), and limit the memory usage of RocksDB with
block cache size. Performance may be degraded, but TMs can avoid being
killed by YARN for overused memory.

This may not be the same cause of your problem, but it may be helpful.

Regards,
Xiaogang






2017-06-28 23:26 GMT+08:00 Vinay Patil :

> Hi Aljoscha,
>
> I am using event Time based tumbling window wherein the allowedLateness is
> kept to Long.MAX_VALUE and I have custom trigger which is similar to 1.0.3
> where Flink was not discarding late elements (we have discussed this
> scenario before).
>
> The watermark is working correctly because I have validated the records
> earlier.
>
> I was doubtful that the RocksDB statebackend is not set , but in the logs
> I can clearly see that RocksDB is initialized successfully, so that should
> not be an issue.
>
> Even I have not changed any major  code from the last performance test I
> had done.
>
> The snapshot I had attached is of Off-heap memory, I have only assigned
> 12GB heap memory per TM
>
>
> Regards,
> Vinay Patil
>
> On Wed, Jun 28, 2017 at 8:43 PM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> Just a quick question, because I’m not sure whether this came up in the
>> discussion so far: what kind of windows are you using? Processing
>> time/event time? Sliding Windows/Tumbling Windows? Allowed lateness? How is
>> the watermark behaving?
>>
>> Also, the latest memory usage graph you sent, is that heap memory or
>> off-heap memory or both?
>>
>> Best,
>> Aljoscha
>>
>> > On 27. Jun 2017, at 11:45, vinay patil  wrote:
>> >
>> > Hi Stephan,
>> >
>> > I am observing similar issue with Flink 1.2.1
>> >
>> > The memory is continuously increasing and data is not getting flushed to
>> > disk.
>> >
>> > I have attached the snapshot for reference.
>> >
>> > Also the data processed till now is only 17GB and above 120GB memory is
>> > getting used.
>> >
>> > Is there any change wrt RocksDB configurations
>> >
>> > > bble.com/file/n14013/TM_Memory_Usage.png>
>> >
>> > Regards,
>> > Vinay Patil
>> >
>> >
>> >
>> > --
>> > View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-
>> RocksDB-as-statebackend-tp11752p14013.html
>> > Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>>
>


Re: Checkpoints very slow with high backpressure

2017-05-31 Thread SHI Xiaogang
Hi rhashmi

We are also experiencing slow checkpoints when there exist back pressure.
It seems there is no good method to handle back pressure now.

We work around it by setting a larger number of checkpoint timeout. The
default value is 10min. But checkpoints usually take more time to complete
when there exists back pressure.  You can set it via
`CheckpointConfig#setCheckpointTimeout()`.

Regards,
Xiaogang



2017-06-01 5:36 GMT+08:00 rhashmi :

> So what is the resolution? flink consuming messages from kafka. Flink went
> down about a day ago, so now flink has to process 24 hour worth of events.
> But i hit backpressure, as of right now checkpoint are timing out. Is there
> any recommendation how to handle this situation?
>
> Seems like trigger are also not firing so no update being made to down line
> database.
>
> is there recommended approach to handle backpressure?
>
> Version Flink 1.2.
>
>
>
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Checkpoints-
> very-slow-with-high-backpressure-tp12762p13411.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: trying to externalize checkpoint to s3

2017-05-22 Thread SHI Xiaogang
Hi Sathi,

According to the format specification of URI, "abc-checkpoint" is the host
name in the given uri and the path is null. Therefore, FsStateBackend are
complaining about the usage of the root directory.

Maybe "s3:///abc-checkpoint" ("///" instead of "//") is the uri that you
want to use. It will put all checkpoints under the path "/abc-checkpoint".

Regards,
Xiaogang


2017-05-23 9:34 GMT+08:00 Sathi Chowdhury :

> We are  running flink 1.2 in pre production
>
> I am trying to test checkpoint stored in external location in s3
>
>
>
> I have set these below in flink-conf.yaml
>
>
>
> state.backend: filesystem
>
> state.checkpoints.dir: s3://abc-checkpoint
>
> state.backend.fs.checkpointdir: s3://abc-checkpoint
>
>
>
> I get this failure in job manager log
>
> java.lang.Exception: Cannot initialize File System State Backend with URI
> 's3://abc-checkpoint.
>
> at org.apache.flink.runtime.state.filesystem.
> FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:57)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> createStateBackend(StreamTask.java:719)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:223)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.IllegalArgumentException: Cannot use the root
> directory for checkpoints.
>
> at org.apache.flink.runtime.state.filesystem.FsStateBackend.
> validateAndNormalizeUri(FsStateBackend.java:225)
>
> at org.apache.flink.runtime.state.filesystem.
> FsStateBackend.(FsStateBackend.java:153)
>
> at org.apache.flink.runtime.state.filesystem.
> FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:5
>
> Any clue? I thought as I am using EMR Hadoop to s3 integration is already
> working.
>
> Thanks
> Sathi
> =Notice to Recipient: This e-mail transmission, and any
> documents, files or previous e-mail messages attached to it may contain
> information that is confidential or legally privileged, and intended for
> the use of the individual or entity named above. If you are not the
> intended recipient, or a person responsible for delivering it to the
> intended recipient, you are hereby notified that you must not read this
> transmission and that any disclosure, copying, printing, distribution or
> use of any of the information contained in or attached to this transmission
> is STRICTLY PROHIBITED. If you have received this transmission in error,
> please immediately notify the sender by telephone or return e-mail and
> delete the original transmission and its attachments without reading or
> saving in any manner. Thank you. =
>


Re: Question about start with checkpoint.

2017-05-21 Thread SHI Xiaogang
Hi Yunfan,

Jobs are supposed to correctly restart from both savepoints and checkpoints
with different parallelisms if only operator states and keyed states are
used. In the cases where there exist unpartitionable states (e.g., those
are produced by the Checkpointed interface), the job will fail to restart
if the parallelism is changed.

In Flink, both operator states and keyed states are described as
collections of objects, hence are partitionable. To be specific, operator
states are composed of a list of objects. When the parallelism changes,
these objects will be redistributed to the tasks evenly.

The assignment of keyed states shares a similar idea. The keyed states are
composed of a set of key groups. When the parallelism changes, these key
groups will also be redistributed to the tasks.  The restoring of keyed
states varies in different state backend settings.  In Flink-1.2, the
rocksdb state backend will download all the key-value pairs in its key
group range and insert them into a new rocksdb instance to recover the
states.

You can find more details about the scaling of keyed states and operator
states in the following links.
Dynamic Scaling: Key Groups

FLIP-8: Rescalable Non-Partitioned State


May the information helps you.

Regards
Xiaogang


2017-05-21 11:43 GMT+08:00 yunfan123 :

> How this exactly works?
> For example, I  save my state using rocksdb + hdfs.
> When I change the parallelism of my job,  can start from checkpoint work?
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Question-about-
> start-with-checkpoint-tp13234.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>