Re: Flink Taskmanager failure recovery and large state

2021-04-07 Thread Yaroslav Tkachenko
Hi Dhanesh,

Thanks for the recommendation! I'll try it out.

On Wed, Apr 7, 2021 at 1:59 AM dhanesh arole  wrote:

> Hi Yaroslav,
>
> We faced similar issues in our large stateful stream processing job. I had
> asked question
> 
>  about it
> on a user mailing list a few days back. Based on the reply to my question,
> we figured that this happens when the task manager has just come back
> online and is trying to rebuild / restore its state, but meanwhile another
> task manager gets restarted or killed. In this situation job manager
> cancels the job, as a result all task managers also start cancelling the
> tasks that they are running atm. As a part of cancellation flow, channel
> buffer through which flink TM writes to the disk gets closed. But there's
> already state rebuilding happening concurrently using that channelBuffer.
> This causes the channelClosed exception.
>
> As a solution to this problem, we increased *akka.ask.timeout *to 10m.
> This gives enough room to task managers to wait for rpc responses from
> other task managers during restart. As a result TM becomes more lenient in
> marking other TM as failed and cancelling the job in the first place.
>
> -
> Dhanesh Arole
>
>
>
> On Tue, Apr 6, 2021 at 7:55 PM Robert Metzger  wrote:
>
>> Hey Yaroslav,
>>
>> GCS is a somewhat popular filesystem that should work fine with Flink.
>>
>> It seems that the initial scale of a bucket is 5000 read requests per
>> second (https://cloud.google.com/storage/docs/request-rate), your job
>> should be at roughly the same rate (depending on how fast your job restarts
>> in the restart loop).
>>
>> You could try to tweak the GCS configuration parameters, such as
>> increasing "fs.gs.http.read-timeout" and "fs.gs.http.max.retry". (see
>> https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md
>> for all available options)
>>
>>
>> The "ExecutionGraphException: The execution attempt
>> 6f3651a49344754a1e7d1fb20cf2cba3 was not found." error looks weird, but it
>> should not cause the restarts.
>>
>>
>> On Fri, Apr 2, 2021 at 2:15 AM Guowei Ma  wrote:
>>
>>> Hi, Yaroslav
>>>
>>> AFAIK Flink does not retry if the download checkpoint from the storage
>>> fails. On the other hand the FileSystem already has this retry mechanism
>>> already. So I think there is no need for flink to retry.
>>> I am not very sure but from the log it seems that the gfs's retry is
>>> interrupted by some reason. So I think we could get more insight if we
>>> could find the first fail cause.
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Fri, Apr 2, 2021 at 12:07 AM Yaroslav Tkachenko <
>>> yaroslav.tkache...@shopify.com> wrote:
>>>
 Hi Guowei,

 I thought Flink can support any HDFS-compatible object store like the
 majority of Big Data frameworks. So we just added
 "flink-shaded-hadoop-2-uber" and "gcs-connector-latest-hadoop2"
 dependencies to the classpath, after that using "gs" prefix seems to be
 possible:

 state.checkpoints.dir: gs:///flink-checkpoints
 state.savepoints.dir: gs:///flink-savepoints

 And yes, I noticed that retries logging too, but I'm not sure if it's
 implemented on the Flink side or the GCS connector side? Probably need to
 dive deeper into the source code. And if it's implemented on the GCS
 connector side, will Flink wait for all the retries? That's why I asked
 about the potential timeout on the Flink side.

 The JM log doesn't have much besides from what I already posted. It's
 hard for me to share the whole log, but the RocksDB initialization part can
 be relevant:

 16:03:41.987 [cluster-io-thread-3] INFO
  org.apache.flink.runtime.jobmaster.JobMaster - Using job/cluster config to
 configure application-defined state backend:
 RocksDBStateBackend{checkpointStreamBackend=File State Backend
 (checkpoints: 'gs:///flink-checkpoints', savepoints:
 'gs:///flink-savepoints', asynchronous: TRUE, fileStateThreshold:
 1048576), localRocksDbDirectories=[/rocksdb],
 enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
 writeBatchSize=2097152}
 16:03:41.988 [cluster-io-thread-3] INFO
  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
 predefined options: FLASH_SSD_OPTIMIZED.
 16:03:41.988 [cluster-io-thread-3] INFO
  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
 application-defined options factory:
 DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=4,
 state.backend.rocksdb.block.blocksize=16 kb,
 state.backend.rocksdb.block.cache-size=64 mb}}.
 16:03:41.988 [cluster-io-thread-3] INFO
  org.apache.flink.runtime.jobmaster.JobMaster - Using application-defined
 state backend: 

Re: Flink Taskmanager failure recovery and large state

2021-04-07 Thread dhanesh arole
Hi Yaroslav,

We faced similar issues in our large stateful stream processing job. I had
asked question

about it
on a user mailing list a few days back. Based on the reply to my question,
we figured that this happens when the task manager has just come back
online and is trying to rebuild / restore its state, but meanwhile another
task manager gets restarted or killed. In this situation job manager
cancels the job, as a result all task managers also start cancelling the
tasks that they are running atm. As a part of cancellation flow, channel
buffer through which flink TM writes to the disk gets closed. But there's
already state rebuilding happening concurrently using that channelBuffer.
This causes the channelClosed exception.

As a solution to this problem, we increased *akka.ask.timeout *to 10m. This
gives enough room to task managers to wait for rpc responses from other
task managers during restart. As a result TM becomes more lenient in
marking other TM as failed and cancelling the job in the first place.

-
Dhanesh Arole



On Tue, Apr 6, 2021 at 7:55 PM Robert Metzger  wrote:

> Hey Yaroslav,
>
> GCS is a somewhat popular filesystem that should work fine with Flink.
>
> It seems that the initial scale of a bucket is 5000 read requests per
> second (https://cloud.google.com/storage/docs/request-rate), your job
> should be at roughly the same rate (depending on how fast your job restarts
> in the restart loop).
>
> You could try to tweak the GCS configuration parameters, such as
> increasing "fs.gs.http.read-timeout" and "fs.gs.http.max.retry". (see
> https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md
> for all available options)
>
>
> The "ExecutionGraphException: The execution attempt
> 6f3651a49344754a1e7d1fb20cf2cba3 was not found." error looks weird, but it
> should not cause the restarts.
>
>
> On Fri, Apr 2, 2021 at 2:15 AM Guowei Ma  wrote:
>
>> Hi, Yaroslav
>>
>> AFAIK Flink does not retry if the download checkpoint from the storage
>> fails. On the other hand the FileSystem already has this retry mechanism
>> already. So I think there is no need for flink to retry.
>> I am not very sure but from the log it seems that the gfs's retry is
>> interrupted by some reason. So I think we could get more insight if we
>> could find the first fail cause.
>>
>> Best,
>> Guowei
>>
>>
>> On Fri, Apr 2, 2021 at 12:07 AM Yaroslav Tkachenko <
>> yaroslav.tkache...@shopify.com> wrote:
>>
>>> Hi Guowei,
>>>
>>> I thought Flink can support any HDFS-compatible object store like the
>>> majority of Big Data frameworks. So we just added
>>> "flink-shaded-hadoop-2-uber" and "gcs-connector-latest-hadoop2"
>>> dependencies to the classpath, after that using "gs" prefix seems to be
>>> possible:
>>>
>>> state.checkpoints.dir: gs:///flink-checkpoints
>>> state.savepoints.dir: gs:///flink-savepoints
>>>
>>> And yes, I noticed that retries logging too, but I'm not sure if it's
>>> implemented on the Flink side or the GCS connector side? Probably need to
>>> dive deeper into the source code. And if it's implemented on the GCS
>>> connector side, will Flink wait for all the retries? That's why I asked
>>> about the potential timeout on the Flink side.
>>>
>>> The JM log doesn't have much besides from what I already posted. It's
>>> hard for me to share the whole log, but the RocksDB initialization part can
>>> be relevant:
>>>
>>> 16:03:41.987 [cluster-io-thread-3] INFO
>>>  org.apache.flink.runtime.jobmaster.JobMaster - Using job/cluster config to
>>> configure application-defined state backend:
>>> RocksDBStateBackend{checkpointStreamBackend=File State Backend
>>> (checkpoints: 'gs:///flink-checkpoints', savepoints:
>>> 'gs:///flink-savepoints', asynchronous: TRUE, fileStateThreshold:
>>> 1048576), localRocksDbDirectories=[/rocksdb],
>>> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
>>> writeBatchSize=2097152}
>>> 16:03:41.988 [cluster-io-thread-3] INFO
>>>  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
>>> predefined options: FLASH_SSD_OPTIMIZED.
>>> 16:03:41.988 [cluster-io-thread-3] INFO
>>>  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
>>> application-defined options factory:
>>> DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=4,
>>> state.backend.rocksdb.block.blocksize=16 kb,
>>> state.backend.rocksdb.block.cache-size=64 mb}}.
>>> 16:03:41.988 [cluster-io-thread-3] INFO
>>>  org.apache.flink.runtime.jobmaster.JobMaster - Using application-defined
>>> state backend: RocksDBStateBackend{checkpointStreamBackend=File State
>>> Backend (checkpoints: 'gs:///flink-checkpoints', savepoints:
>>> 'gs:///flink-savepoints', asynchronous: TRUE, fileStateThreshold:
>>> 1048576), localRocksDbDirectories=[/rocksdb],
>>> enableIncrementalCheckpointing=TRUE, 

Re: Flink Taskmanager failure recovery and large state

2021-04-06 Thread Robert Metzger
Hey Yaroslav,

GCS is a somewhat popular filesystem that should work fine with Flink.

It seems that the initial scale of a bucket is 5000 read requests per
second (https://cloud.google.com/storage/docs/request-rate), your job
should be at roughly the same rate (depending on how fast your job restarts
in the restart loop).

You could try to tweak the GCS configuration parameters, such as increasing
"fs.gs.http.read-timeout" and "fs.gs.http.max.retry". (see
https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md
for all available options)


The "ExecutionGraphException: The execution attempt
6f3651a49344754a1e7d1fb20cf2cba3 was not found." error looks weird, but it
should not cause the restarts.


On Fri, Apr 2, 2021 at 2:15 AM Guowei Ma  wrote:

> Hi, Yaroslav
>
> AFAIK Flink does not retry if the download checkpoint from the storage
> fails. On the other hand the FileSystem already has this retry mechanism
> already. So I think there is no need for flink to retry.
> I am not very sure but from the log it seems that the gfs's retry is
> interrupted by some reason. So I think we could get more insight if we
> could find the first fail cause.
>
> Best,
> Guowei
>
>
> On Fri, Apr 2, 2021 at 12:07 AM Yaroslav Tkachenko <
> yaroslav.tkache...@shopify.com> wrote:
>
>> Hi Guowei,
>>
>> I thought Flink can support any HDFS-compatible object store like the
>> majority of Big Data frameworks. So we just added
>> "flink-shaded-hadoop-2-uber" and "gcs-connector-latest-hadoop2"
>> dependencies to the classpath, after that using "gs" prefix seems to be
>> possible:
>>
>> state.checkpoints.dir: gs:///flink-checkpoints
>> state.savepoints.dir: gs:///flink-savepoints
>>
>> And yes, I noticed that retries logging too, but I'm not sure if it's
>> implemented on the Flink side or the GCS connector side? Probably need to
>> dive deeper into the source code. And if it's implemented on the GCS
>> connector side, will Flink wait for all the retries? That's why I asked
>> about the potential timeout on the Flink side.
>>
>> The JM log doesn't have much besides from what I already posted. It's
>> hard for me to share the whole log, but the RocksDB initialization part can
>> be relevant:
>>
>> 16:03:41.987 [cluster-io-thread-3] INFO
>>  org.apache.flink.runtime.jobmaster.JobMaster - Using job/cluster config to
>> configure application-defined state backend:
>> RocksDBStateBackend{checkpointStreamBackend=File State Backend
>> (checkpoints: 'gs:///flink-checkpoints', savepoints:
>> 'gs:///flink-savepoints', asynchronous: TRUE, fileStateThreshold:
>> 1048576), localRocksDbDirectories=[/rocksdb],
>> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
>> writeBatchSize=2097152}
>> 16:03:41.988 [cluster-io-thread-3] INFO
>>  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
>> predefined options: FLASH_SSD_OPTIMIZED.
>> 16:03:41.988 [cluster-io-thread-3] INFO
>>  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
>> application-defined options factory:
>> DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=4,
>> state.backend.rocksdb.block.blocksize=16 kb,
>> state.backend.rocksdb.block.cache-size=64 mb}}.
>> 16:03:41.988 [cluster-io-thread-3] INFO
>>  org.apache.flink.runtime.jobmaster.JobMaster - Using application-defined
>> state backend: RocksDBStateBackend{checkpointStreamBackend=File State
>> Backend (checkpoints: 'gs:///flink-checkpoints', savepoints:
>> 'gs:///flink-savepoints', asynchronous: TRUE, fileStateThreshold:
>> 1048576), localRocksDbDirectories=[/rocksdb],
>> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
>> writeBatchSize=2097152}
>>
>> Thanks!
>>
>> On Thu, Apr 1, 2021 at 2:30 AM Guowei Ma  wrote:
>>
>>> Hi, Yaroslav
>>>
>>> AFAIK there is no official GCS FileSystem support in FLINK.  Does the
>>> GCS is implemented by yourself?
>>> Would you like to share the whole log of jm?
>>>
>>> BTW: From the following log I think the implementation has already some
>>> retry mechanism.
>>> >>> Interrupted while sleeping before retry. Giving up after 1/10
>>> retries for
>>> 'gs:///flink-checkpoints/150a406a50d20e1ee77422d25ef28d
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko <
>>> yaroslav.tkache...@shopify.com> wrote:
>>>
 Hi everyone,

 I'm wondering if people have experienced issues with Taskmanager
 failure recovery when dealing with a lot of state.

 I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints
 and checkpoints. ~150 task managers with 4 slots each.

 When I run a pipeline without much state and kill one of the
 taskmanagers, it takes a few minutes to recover (I see a few restarts), but
 eventually when a new replacement taskmanager is registered with the
 jobmanager things go back to healthy.

 But when I run a pipeline with a lot of state (1TB+) and kill one of
 

Re: Flink Taskmanager failure recovery and large state

2021-04-01 Thread Guowei Ma
Hi, Yaroslav

AFAIK Flink does not retry if the download checkpoint from the storage
fails. On the other hand the FileSystem already has this retry mechanism
already. So I think there is no need for flink to retry.
I am not very sure but from the log it seems that the gfs's retry is
interrupted by some reason. So I think we could get more insight if we
could find the first fail cause.

Best,
Guowei


On Fri, Apr 2, 2021 at 12:07 AM Yaroslav Tkachenko <
yaroslav.tkache...@shopify.com> wrote:

> Hi Guowei,
>
> I thought Flink can support any HDFS-compatible object store like the
> majority of Big Data frameworks. So we just added
> "flink-shaded-hadoop-2-uber" and "gcs-connector-latest-hadoop2"
> dependencies to the classpath, after that using "gs" prefix seems to be
> possible:
>
> state.checkpoints.dir: gs:///flink-checkpoints
> state.savepoints.dir: gs:///flink-savepoints
>
> And yes, I noticed that retries logging too, but I'm not sure if it's
> implemented on the Flink side or the GCS connector side? Probably need to
> dive deeper into the source code. And if it's implemented on the GCS
> connector side, will Flink wait for all the retries? That's why I asked
> about the potential timeout on the Flink side.
>
> The JM log doesn't have much besides from what I already posted. It's hard
> for me to share the whole log, but the RocksDB initialization part can be
> relevant:
>
> 16:03:41.987 [cluster-io-thread-3] INFO
>  org.apache.flink.runtime.jobmaster.JobMaster - Using job/cluster config to
> configure application-defined state backend:
> RocksDBStateBackend{checkpointStreamBackend=File State Backend
> (checkpoints: 'gs:///flink-checkpoints', savepoints:
> 'gs:///flink-savepoints', asynchronous: TRUE, fileStateThreshold:
> 1048576), localRocksDbDirectories=[/rocksdb],
> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
> writeBatchSize=2097152}
> 16:03:41.988 [cluster-io-thread-3] INFO
>  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
> predefined options: FLASH_SSD_OPTIMIZED.
> 16:03:41.988 [cluster-io-thread-3] INFO
>  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
> application-defined options factory:
> DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=4,
> state.backend.rocksdb.block.blocksize=16 kb,
> state.backend.rocksdb.block.cache-size=64 mb}}.
> 16:03:41.988 [cluster-io-thread-3] INFO
>  org.apache.flink.runtime.jobmaster.JobMaster - Using application-defined
> state backend: RocksDBStateBackend{checkpointStreamBackend=File State
> Backend (checkpoints: 'gs:///flink-checkpoints', savepoints:
> 'gs:///flink-savepoints', asynchronous: TRUE, fileStateThreshold:
> 1048576), localRocksDbDirectories=[/rocksdb],
> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
> writeBatchSize=2097152}
>
> Thanks!
>
> On Thu, Apr 1, 2021 at 2:30 AM Guowei Ma  wrote:
>
>> Hi, Yaroslav
>>
>> AFAIK there is no official GCS FileSystem support in FLINK.  Does the GCS
>> is implemented by yourself?
>> Would you like to share the whole log of jm?
>>
>> BTW: From the following log I think the implementation has already some
>> retry mechanism.
>> >>> Interrupted while sleeping before retry. Giving up after 1/10 retries
>> for 'gs:///flink-checkpoints/150a406a50d20e1ee77422d25ef28d
>>
>> Best,
>> Guowei
>>
>>
>> On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko <
>> yaroslav.tkache...@shopify.com> wrote:
>>
>>> Hi everyone,
>>>
>>> I'm wondering if people have experienced issues with Taskmanager failure
>>> recovery when dealing with a lot of state.
>>>
>>> I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints
>>> and checkpoints. ~150 task managers with 4 slots each.
>>>
>>> When I run a pipeline without much state and kill one of the
>>> taskmanagers, it takes a few minutes to recover (I see a few restarts), but
>>> eventually when a new replacement taskmanager is registered with the
>>> jobmanager things go back to healthy.
>>>
>>> But when I run a pipeline with a lot of state (1TB+) and kill one of the
>>> taskmanagers, the pipeline never recovers, even after the replacement
>>> taskmanager has joined. It just enters an infinite loop of restarts and
>>> failures.
>>>
>>> On the jobmanager, I see an endless loop of state transitions: RUNNING
>>> -> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING.
>>> It stays in RUNNING for a few seconds, but then transitions into FAILED
>>> with a message like this:
>>>
>>>
>>> 22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph - 
>>> (569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to
>>> FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357).
>>> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>>> readAddress(..) failed: Connection reset by peer (connection to '
>>> 10.30.10.53/10.30.10.53:45789')
>>> at
>>> 

Re: Flink Taskmanager failure recovery and large state

2021-04-01 Thread Yaroslav Tkachenko
Hi Guowei,

I thought Flink can support any HDFS-compatible object store like the
majority of Big Data frameworks. So we just added
"flink-shaded-hadoop-2-uber" and "gcs-connector-latest-hadoop2"
dependencies to the classpath, after that using "gs" prefix seems to be
possible:

state.checkpoints.dir: gs:///flink-checkpoints
state.savepoints.dir: gs:///flink-savepoints

And yes, I noticed that retries logging too, but I'm not sure if it's
implemented on the Flink side or the GCS connector side? Probably need to
dive deeper into the source code. And if it's implemented on the GCS
connector side, will Flink wait for all the retries? That's why I asked
about the potential timeout on the Flink side.

The JM log doesn't have much besides from what I already posted. It's hard
for me to share the whole log, but the RocksDB initialization part can be
relevant:

16:03:41.987 [cluster-io-thread-3] INFO
 org.apache.flink.runtime.jobmaster.JobMaster - Using job/cluster config to
configure application-defined state backend:
RocksDBStateBackend{checkpointStreamBackend=File State Backend
(checkpoints: 'gs:///flink-checkpoints', savepoints:
'gs:///flink-savepoints', asynchronous: TRUE, fileStateThreshold:
1048576), localRocksDbDirectories=[/rocksdb],
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
writeBatchSize=2097152}
16:03:41.988 [cluster-io-thread-3] INFO
 org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
predefined options: FLASH_SSD_OPTIMIZED.
16:03:41.988 [cluster-io-thread-3] INFO
 org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
application-defined options factory:
DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=4,
state.backend.rocksdb.block.blocksize=16 kb,
state.backend.rocksdb.block.cache-size=64 mb}}.
16:03:41.988 [cluster-io-thread-3] INFO
 org.apache.flink.runtime.jobmaster.JobMaster - Using application-defined
state backend: RocksDBStateBackend{checkpointStreamBackend=File State
Backend (checkpoints: 'gs:///flink-checkpoints', savepoints:
'gs:///flink-savepoints', asynchronous: TRUE, fileStateThreshold:
1048576), localRocksDbDirectories=[/rocksdb],
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
writeBatchSize=2097152}

Thanks!

On Thu, Apr 1, 2021 at 2:30 AM Guowei Ma  wrote:

> Hi, Yaroslav
>
> AFAIK there is no official GCS FileSystem support in FLINK.  Does the GCS
> is implemented by yourself?
> Would you like to share the whole log of jm?
>
> BTW: From the following log I think the implementation has already some
> retry mechanism.
> >>> Interrupted while sleeping before retry. Giving up after 1/10 retries
> for 'gs:///flink-checkpoints/150a406a50d20e1ee77422d25ef28d
>
> Best,
> Guowei
>
>
> On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko <
> yaroslav.tkache...@shopify.com> wrote:
>
>> Hi everyone,
>>
>> I'm wondering if people have experienced issues with Taskmanager failure
>> recovery when dealing with a lot of state.
>>
>> I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints
>> and checkpoints. ~150 task managers with 4 slots each.
>>
>> When I run a pipeline without much state and kill one of the
>> taskmanagers, it takes a few minutes to recover (I see a few restarts), but
>> eventually when a new replacement taskmanager is registered with the
>> jobmanager things go back to healthy.
>>
>> But when I run a pipeline with a lot of state (1TB+) and kill one of the
>> taskmanagers, the pipeline never recovers, even after the replacement
>> taskmanager has joined. It just enters an infinite loop of restarts and
>> failures.
>>
>> On the jobmanager, I see an endless loop of state transitions: RUNNING
>> -> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING.
>> It stays in RUNNING for a few seconds, but then transitions into FAILED
>> with a message like this:
>>
>>
>> 22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph - 
>> (569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to
>> FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357).
>> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> readAddress(..) failed: Connection reset by peer (connection to '
>> 10.30.10.53/10.30.10.53:45789')
>> at
>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>> ...
>> Caused by:
>> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
>> readAddress(..) failed: Connection reset by peer
>>
>>
>> Which, I guess, means a failed Taskmanager. And since there are not
>> enough task slots to run it goes into this endless loop again. It's never
>> the same Taskmanager that fails.
>>
>>
>>
>> On the Taskmanager side, things look more interesting. I see a variety of
>> exceptions:
>>
>>
>> 

Re: Flink Taskmanager failure recovery and large state

2021-04-01 Thread Guowei Ma
Hi, Yaroslav

AFAIK there is no official GCS FileSystem support in FLINK.  Does the GCS
is implemented by yourself?
Would you like to share the whole log of jm?

BTW: From the following log I think the implementation has already some
retry mechanism.
>>> Interrupted while sleeping before retry. Giving up after 1/10 retries
for 'gs:///flink-checkpoints/150a406a50d20e1ee77422d25ef28d

Best,
Guowei


On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko <
yaroslav.tkache...@shopify.com> wrote:

> Hi everyone,
>
> I'm wondering if people have experienced issues with Taskmanager failure
> recovery when dealing with a lot of state.
>
> I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints
> and checkpoints. ~150 task managers with 4 slots each.
>
> When I run a pipeline without much state and kill one of the
> taskmanagers, it takes a few minutes to recover (I see a few restarts), but
> eventually when a new replacement taskmanager is registered with the
> jobmanager things go back to healthy.
>
> But when I run a pipeline with a lot of state (1TB+) and kill one of the
> taskmanagers, the pipeline never recovers, even after the replacement
> taskmanager has joined. It just enters an infinite loop of restarts and
> failures.
>
> On the jobmanager, I see an endless loop of state transitions: RUNNING
> -> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING.
> It stays in RUNNING for a few seconds, but then transitions into FAILED
> with a message like this:
>
>
> 22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph - 
> (569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to
> FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357).
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
> readAddress(..) failed: Connection reset by peer (connection to '
> 10.30.10.53/10.30.10.53:45789')
> at
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> ...
> Caused by:
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> readAddress(..) failed: Connection reset by peer
>
>
> Which, I guess, means a failed Taskmanager. And since there are not enough
> task slots to run it goes into this endless loop again. It's never the same
> Taskmanager that fails.
>
>
>
> On the Taskmanager side, things look more interesting. I see a variety of
> exceptions:
>
>
> org.apache.flink.runtime.taskmanager.Task -  (141/624)#7
> (6f3651a49344754a1e7d1fb20cf2cba3) switched from RUNNING to FAILED.
> org.apache.flink.runtime.jobmaster.ExecutionGraphException: The execution
> attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found.
>
>
> also
>
>
> WARNING: Failed read retry #1/10 for
> 'gs:///flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'.
> Sleeping...
> java.nio.channels.ClosedByInterruptException
> at
> java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown
> Source)
> at
> java.base/java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown
> Source)
> at
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:313)
> at
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.read(GoogleHadoopFSInputStream.java:118)
> at java.base/java.io.DataInputStream.read(Unknown Source)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
> at java.base/java.io.InputStream.read(Unknown Source)
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:135)
> ...
>
>
> and
>
>
> SEVERE: Interrupted while sleeping before retry. Giving up after 1/10
> retries for
> 'gs:///flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'
> 20:52:46.894 [ (141/624)#7] ERROR
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder -
> Caught unexpected exception.
> java.nio.channels.ClosedChannelException: null
> at sun.nio.ch.FileChannelImpl.ensureOpen(Unknown Source) ~[?:?]
> at sun.nio.ch.FileChannelImpl.write(Unknown Source) ~[?:?]
> at java.nio.channels.Channels.writeFullyImpl(Unknown Source) ~[?:?]
> at java.nio.channels.Channels.writeFully(Unknown Source) ~[?:?]
> at java.nio.channels.Channels$1.write(Unknown Source) ~[?:?]
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:140)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>
>
> also
>
>
> 20:52:46.895 [ (141/624)#7] WARN
>  org.apache.flink.streaming.api.operators.BackendRestorerProcedure -
> Exception while restoring keyed state backend for
> KeyedProcessOperator_ff97494a101b44a4b7a2913028a50243_(141/624) from

Flink Taskmanager failure recovery and large state

2021-03-31 Thread Yaroslav Tkachenko
Hi everyone,

I'm wondering if people have experienced issues with Taskmanager failure
recovery when dealing with a lot of state.

I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints and
checkpoints. ~150 task managers with 4 slots each.

When I run a pipeline without much state and kill one of the
taskmanagers, it takes a few minutes to recover (I see a few restarts), but
eventually when a new replacement taskmanager is registered with the
jobmanager things go back to healthy.

But when I run a pipeline with a lot of state (1TB+) and kill one of the
taskmanagers, the pipeline never recovers, even after the replacement
taskmanager has joined. It just enters an infinite loop of restarts and
failures.

On the jobmanager, I see an endless loop of state transitions: RUNNING
-> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING.
It stays in RUNNING for a few seconds, but then transitions into FAILED
with a message like this:


22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph - 
(569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to
FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357).
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
readAddress(..) failed: Connection reset by peer (connection to '
10.30.10.53/10.30.10.53:45789')
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
...
Caused by:
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
readAddress(..) failed: Connection reset by peer


Which, I guess, means a failed Taskmanager. And since there are not enough
task slots to run it goes into this endless loop again. It's never the same
Taskmanager that fails.



On the Taskmanager side, things look more interesting. I see a variety of
exceptions:


org.apache.flink.runtime.taskmanager.Task -  (141/624)#7
(6f3651a49344754a1e7d1fb20cf2cba3) switched from RUNNING to FAILED.
org.apache.flink.runtime.jobmaster.ExecutionGraphException: The execution
attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found.


also


WARNING: Failed read retry #1/10 for
'gs:///flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'.
Sleeping...
java.nio.channels.ClosedByInterruptException
at java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown
Source)
at
java.base/java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown
Source)
at
com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:313)
at
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.read(GoogleHadoopFSInputStream.java:118)
at java.base/java.io.DataInputStream.read(Unknown Source)
at
org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
at java.base/java.io.InputStream.read(Unknown Source)
at
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:135)
...


and


SEVERE: Interrupted while sleeping before retry. Giving up after 1/10
retries for
'gs:///flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'
20:52:46.894 [ (141/624)#7] ERROR
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder -
Caught unexpected exception.
java.nio.channels.ClosedChannelException: null
at sun.nio.ch.FileChannelImpl.ensureOpen(Unknown Source) ~[?:?]
at sun.nio.ch.FileChannelImpl.write(Unknown Source) ~[?:?]
at java.nio.channels.Channels.writeFullyImpl(Unknown Source) ~[?:?]
at java.nio.channels.Channels.writeFully(Unknown Source) ~[?:?]
at java.nio.channels.Channels$1.write(Unknown Source) ~[?:?]
at
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:140)
~[flink-dist_2.12-1.12.0.jar:1.12.0]


also


20:52:46.895 [ (141/624)#7] WARN
 org.apache.flink.streaming.api.operators.BackendRestorerProcedure -
Exception while restoring keyed state backend for
KeyedProcessOperator_ff97494a101b44a4b7a2913028a50243_(141/624) from
alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected
exception.
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:328)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
...


and a few of


Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to download
data for state handles.
at
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:92)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
...
Caused by: java.util.concurrent.ExecutionException:
java.lang.RuntimeException: