Re: Checkpoint failures due to other subtasks sharing the ChannelState file (Caused the Job to Stall)

2024-08-02 Thread Dhruv Patel
We have also enabled unaligned checkpoints. Could it be because of that? We
were experience slowness and intermittent packet loss when this issue
occurred.

On Wed, Jul 31, 2024 at 7:43 PM Dhruv Patel  wrote:

> Hi Everyone,
>
> We are observing an interesting issue with continuous checkpoint
> failures in our job causing the event to not be forwarded through the
> pipeline. We saw a spam of the below log in all our task manager instances.
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: The
> checkpoint was aborted due to exception of other subtasks sharing the
> ChannelState file.
>
> Can you help us here with what can be the issue? The issue does not get
> fixed until we restart the jobs.
>
>
> Our setup is something like this.
>
> Flink Version: 1.18.1 (Session Mode)
> We use RocksDB without incremental checkpoint
> State size: ~ 2GB
> No memory issues in TMs
>
> Checkpointing frequency: Every 1 minute
>
>
>
> 2024-07-30 12:16:49.043 [AsyncOperations-thread-83] INFO
> o.a.flink.streaming.runtime.tasks.AsyncCheckpointRunnable  - KeyedProcess
> -> GroupRelationBuilderStream (23/60)#0 - asynchronous part of checkpoint
> 29749 could not be completed.
>
>
>
> java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.checkpoint.CheckpointException: The checkpoint was
> aborted due to exception of other subtasks sharing the ChannelState file.
>
>
>
>  at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>
>
>
>  at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>
>
>
>  at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:66)
>
>
>
>  at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
>
>
>
>
> at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
>
>
>
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>
>
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>
>
>  at java.lang.Thread.run(Thread.java:750)
>
>
>
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: The
> checkpoint was aborted due to exception of other subtasks sharing the
> ChannelState file.
>
>
>
>
> at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.fail(ChannelStateCheckpointWriter.java:298)
>
>
>
>
> at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.failAndClearWriter(ChannelStateWriteRequestDispatcherImpl.java:212)
>
>
>
>
> at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.handleCheckpointAbortRequest(ChannelStateWriteRequestDispatcherImpl.java:189)
>
>
>
>
> at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:129)
>
>
>
>
> at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:94)
>
>
>
>
> at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:172)
>
>
>
>
> at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:127)
>
>
>
>  ... 1 common frames omitted
>
>
>
> Caused by: java.util.concurrent.CancellationException: checkpoint aborted
> via notification
>
>
>
>  at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:456)
>
>
>
>  at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointAborted(SubtaskCheckpointCoordinatorImpl.java:410)
>
>
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$18(StreamTask.java:1406)
>
>
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$20(StreamTask.java:1429)
>
>
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>
>
>
>
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>
>
>
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
>
>
>
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
>
>
>
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
>
>
>
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229

Re: checkpoint upload thread

2024-08-01 Thread Yanfei Lei
Hi Enric,

Sorry for the confusion, I mean "It can be done theoretically, and it
depends on the specific implementation of the file system client in
fact."

I think there are two ways to let different tasks share a connection
(In other words: "socket"):
1. Share one *Output Stream*;
2. Use different Output Streams, but those OutputStreams share one
underlying socket(which is implemented by the specific file system
client).

Back to Flink, currently different tasks do not share the output
streams, but I am not sure whether the underlying sockets are shared.


Enric Ott <243816...@qq.com> 于2024年8月1日周四 10:04写道:

>
> Hi,Yanfei:
>   What do you mean by using the word possible in statment it is possible to 
> use the same
> connection for an operator chain? Meaning able to be done but not applied in 
> fact? Or actually applied but with applied probability?
>
>   Thanks.
>
>
> -- 原始邮件 --
> 发件人: "Yanfei Lei" ;
> 发送时间: 2024年7月30日(星期二) 下午5:15
> 收件人: "Enric Ott"<243816...@qq.com>;
> 抄送: "user";
> 主题: Re: checkpoint upload thread
>
> Hi Enric,
>
> If I understand correctly, one subtask would use one
> `asyncOperationsThreadPool`[1,2], it is possible to use the same
> connection for an operator chain.
>
> [1] 
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L443
> [2] 
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java#L716
>
> Enric Ott <243816...@qq.com> 于2024年7月30日周二 11:11写道:
> >
> > Hi,Community:
> >   Does Flink upload states and inflight buffers within the same 
> > opratorchain using the same connection (instead of per connection per 
> > operator)?
>
>
>
> --
> Best,
> Yanfei



--
Best,
Yanfei


Re: checkpoint upload thread

2024-07-30 Thread Yanfei Lei
Hi Enric,

If I understand correctly, one subtask would use one
`asyncOperationsThreadPool`[1,2], it is possible to use the same
connection for an operator chain.

[1] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L443
[2] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java#L716

Enric Ott <243816...@qq.com> 于2024年7月30日周二 11:11写道:
>
> Hi,Community:
>   Does Flink upload states and inflight buffers within the same opratorchain 
> using the same connection (instead of per connection per operator)?



-- 
Best,
Yanfei


Re: Checkpoint RMM

2023-11-27 Thread xiangyu feng
Hi Oscar,

> but we don't understand why this incremental checkpoint keeps increasing

AFAIK, when performing incremental checkpoint, the RocksDBStateBackend will
upload the new created SST files to remote storage. The total size of these
files is the incremental checkpoint size. However, the new created SST
files generated by RocksDB's compaction behavior are not entirely decided
by the the new data ingested into the state. RocksDB stores data as a LSM
Tree which has spatial magnification, the size of these files generated by
compaction are affected by different compaction strategy and might be
proportional to the overall size of the LSM Tree.

Hope this solves yours doubts.

Xiangyu Feng


Oscar Perez via user  于2023年11月27日周一 19:55写道:

> Hi,
>
> We have a long running job in production and we are trying to understand
> the metrics for this job, see attached screenshot.
>
> We have enabled incremental checkpoint for this job and we use RocksDB as
> a state backend.
>
> When deployed from fresh state, the initial checkpoint size is about*
> 2.41G*. I guess most of the contents come from table API and reading a
> bunch of topics from earliest.
>
> Few data regarding the graph:
>
> Full checkpoint size spans from* 2.41G *in September 21st until *11.6G*
> in November 14th
> The last checkpoint size (incremental checkpoint) goes from *232Mb* in
> september 21st until  *2.35Gb *on November 14th. We take incremental
> checkpoints every 30 seconds
> Time that it takes to take a checkpoint goes from *1.66seconds *on
> September 21st until *10.85 seconds* on November 14th
>
> Few things we dont understand
>
> Why incremental checkpoint size keeps increasing? My assumption would be
> that deltas are lineal and the incremental checkpoint size would remain
> around 230Mb but it keeps increasing over time until it reaches to 2.35Gb !
>
> Full checkpoint size does not completely make sense. If each incremental
> checkpoint size keeps increasing linearly, I would expect the full
> checkpoint size to increase way way faster as the full checkpoint size is
> the sum of all incremental checkpoints and we take an incremental
> checkpoint every 30 seconds.
>
> Time that takes to take a checkpoint correlates with the incremental
> checkpoint sizes. The bigger the incremental checkpoint size, the longer it
> takes to store it but we dont understand why this incremental checkpoint
> keeps increasing. Is this something related to table API internals ?
>
> Thanks for any help that could be given!
> Regards,
> Oscar
>
>
>
>
>
>


Re: Checkpoint jitter?

2023-09-13 Thread Őrhidi Mátyás
Thanks folks, the changelog based approach looks promising for mitigating
the burst effect in our case.

Cheers,
Matyas

On Wed, Sep 13, 2023 at 7:33 AM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

>
>
> Hi Mátyás,
>
>
>
> Checkpoint are meant to be atomic in nature, i.e. everything is
> checkpointed at the more or less same time.
>
> What you can do in newer Flink versions is to enable the Change Log
> Feature (see [1]) which spreads the actual I/O for writing checkpoint files
> to a longer period and to keep an additional change log file with the
> running updates.
>
> What you get is a little more overall I/O but a quite flat I/O rate.
>
>
>
>
>
> Hope this helps
>
>
>
> Thias
>
>
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/state/state_backends/#enabling-changelog
>
>
>
>
>
> *From:* Őrhidi Mátyás 
> *Sent:* Wednesday, September 13, 2023 2:47 PM
> *To:* Gyula Fóra 
> *Cc:* Hangxiang Yu ; user@flink.apache.org
> *Subject:* Re: Checkpoint jitter?
>
>
>
> Correct, thanks for the clarification Gyula!
>
>
>
> On Wed, Sep 13, 2023 at 1:39 AM Gyula Fóra  wrote:
>
> No, I think what he means is to trigger the checkpoint at slightly
> different times at the different sources so the different parts of the
> pipeline would not checkpoint at the same time.
>
>
>
> Gyula
>
>
>
> On Wed, Sep 13, 2023 at 10:32 AM Hangxiang Yu  wrote:
>
> Hi, Matyas.
>
> Do you mean something like adjusting checkpoint intervals dynamically
> or frequency of uploading files according to the pressure of the durable
> storage ?
>
>
>
> On Wed, Sep 13, 2023 at 9:12 AM Őrhidi Mátyás 
> wrote:
>
> Hey folks,
>
>
>
> Is it possible to add some sort of jitter to the checkpointing logic for
> massively parallel jobs to mitigate the burst impact on the durable storage
> when a checkpoint is triggered?
>
>
>
> Thanks,
>
> Matyas
>
>
>
>
> --
>
> Best,
> Hangxiang.
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


RE: Checkpoint jitter?

2023-09-13 Thread Schwalbe Matthias

Hi Mátyás,

Checkpoint are meant to be atomic in nature, i.e. everything is checkpointed at 
the more or less same time.
What you can do in newer Flink versions is to enable the Change Log Feature 
(see [1]) which spreads the actual I/O for writing checkpoint files to a longer 
period and to keep an additional change log file with the running updates.
What you get is a little more overall I/O but a quite flat I/O rate.


Hope this helps

Thias


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/state/state_backends/#enabling-changelog


From: Őrhidi Mátyás 
Sent: Wednesday, September 13, 2023 2:47 PM
To: Gyula Fóra 
Cc: Hangxiang Yu ; user@flink.apache.org
Subject: Re: Checkpoint jitter?

Correct, thanks for the clarification Gyula!

On Wed, Sep 13, 2023 at 1:39 AM Gyula Fóra 
mailto:gyula.f...@gmail.com>> wrote:
No, I think what he means is to trigger the checkpoint at slightly different 
times at the different sources so the different parts of the pipeline would not 
checkpoint at the same time.

Gyula

On Wed, Sep 13, 2023 at 10:32 AM Hangxiang Yu 
mailto:master...@gmail.com>> wrote:
Hi, Matyas.
Do you mean something like adjusting checkpoint intervals dynamically or 
frequency of uploading files according to the pressure of the durable storage ?

On Wed, Sep 13, 2023 at 9:12 AM Őrhidi Mátyás 
mailto:matyas.orh...@gmail.com>> wrote:
Hey folks,

Is it possible to add some sort of jitter to the checkpointing logic for 
massively parallel jobs to mitigate the burst impact on the durable storage 
when a checkpoint is triggered?

Thanks,
Matyas


--
Best,
Hangxiang.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


Re: Checkpoint jitter?

2023-09-13 Thread Őrhidi Mátyás
Correct, thanks for the clarification Gyula!

On Wed, Sep 13, 2023 at 1:39 AM Gyula Fóra  wrote:

> No, I think what he means is to trigger the checkpoint at slightly
> different times at the different sources so the different parts of the
> pipeline would not checkpoint at the same time.
>
> Gyula
>
> On Wed, Sep 13, 2023 at 10:32 AM Hangxiang Yu  wrote:
>
>> Hi, Matyas.
>> Do you mean something like adjusting checkpoint intervals dynamically
>> or frequency of uploading files according to the pressure of the durable
>> storage ?
>>
>> On Wed, Sep 13, 2023 at 9:12 AM Őrhidi Mátyás 
>> wrote:
>>
>>> Hey folks,
>>>
>>> Is it possible to add some sort of jitter to the checkpointing logic for
>>> massively parallel jobs to mitigate the burst impact on the durable storage
>>> when a checkpoint is triggered?
>>>
>>> Thanks,
>>> Matyas
>>>
>>
>>
>> --
>> Best,
>> Hangxiang.
>>
>


Re: Checkpoint jitter?

2023-09-13 Thread Gyula Fóra
No, I think what he means is to trigger the checkpoint at slightly
different times at the different sources so the different parts of the
pipeline would not checkpoint at the same time.

Gyula

On Wed, Sep 13, 2023 at 10:32 AM Hangxiang Yu  wrote:

> Hi, Matyas.
> Do you mean something like adjusting checkpoint intervals dynamically
> or frequency of uploading files according to the pressure of the durable
> storage ?
>
> On Wed, Sep 13, 2023 at 9:12 AM Őrhidi Mátyás 
> wrote:
>
>> Hey folks,
>>
>> Is it possible to add some sort of jitter to the checkpointing logic for
>> massively parallel jobs to mitigate the burst impact on the durable storage
>> when a checkpoint is triggered?
>>
>> Thanks,
>> Matyas
>>
>
>
> --
> Best,
> Hangxiang.
>


Re: Checkpoint jitter?

2023-09-12 Thread Hangxiang Yu
Hi, Matyas.
Do you mean something like adjusting checkpoint intervals dynamically
or frequency of uploading files according to the pressure of the durable
storage ?

On Wed, Sep 13, 2023 at 9:12 AM Őrhidi Mátyás 
wrote:

> Hey folks,
>
> Is it possible to add some sort of jitter to the checkpointing logic for
> massively parallel jobs to mitigate the burst impact on the durable storage
> when a checkpoint is triggered?
>
> Thanks,
> Matyas
>


-- 
Best,
Hangxiang.


RE: Checkpoint/savepoint _metadata

2023-08-29 Thread Schwalbe Matthias
Hi Frederic,

I’ve once (upon a time 😊) had a similar situation when we changed from Flink 
1.8 to Flink 1.13 … It took me a long time to figure out.
Some hints where to start to look:

  *   _metadata file is used for
 *   Job manager state
 *   Smallish keyed state (in order to avoid too many small state files)
 *   Operator state (non-keyed)
  *   Does the operator that is getting blocked in initialization use operator 
state?
 *   Look for some condition that might cause it growing
 *   In my case back then, a minor condition caused the operator state 
being duplicated per operator parallelism when loading from a savepoint, which 
caused exponential growth per savepoint cycle
  *   You can obtain a local copy of this savepoint and try to load it by means 
of the state-processor-api
 *   Breaking into the debugger, at some point the _metadata file gets 
loaded and allows to determine which state actually had the run-away and what 
might have caused duplication

I hope this helps

Thias



From: Frederic Leger 
Sent: Monday, August 28, 2023 12:30 PM
To: user@flink.apache.org
Subject: Checkpoint/savepoint _metadata

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi team,

We use flink 1.16.0 with openjdk-11-jre mainly to run streaming jobs.
We do checkpoints with 2 min interval and savepoint when deploying new job 
version.
We also use rocksdb state backend for most of them.

We had a streaming job running for long without any issue and during a new 
deployment we could not launch it anymore, it was getting stuck on CREATING on 
one task, then was failing and restarting and so on.
In this Flink job, we handle a large data stream using key-based grouping. 
Inside a processFunction, we use MapState[Long, String] as our state storage, 
which keeps data with associated time limits (TTL) of 30 days.

The most relevant error we got from the logs was :

2023-08-02 12:40:52,186 ERROR akka.remote.EndpointWriter
   [] - Transient association error (association remains live)
akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
Actor[akka.tcp://flink@10.1.1.5:31336/user/rpc/taskmanager_0#1435767402]:
 max allowed size 10485760 bytes, actual size of encoded class 
org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 42582878 bytes.

The solution to solve this issue was to increase akka.framesize from default 
(10MB) to 50MB
akka.framesize: 52428800b

After 16h of uptime, we wanted to move back the job to its initial cluster as 
it was running fine since then, but after the savepoint done, we could not 
launch it back and got this error :

2023-08-03 08:49:06,474 ERROR akka.remote.EndpointWriter
   [] - Transient association error (association remains live)
akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
Actor[akka.tcp://flink@10.1.1.5:31358/user/rpc/taskmanager_0#1492669447]:
 max allowed size 52428800 bytes, actual size of encoded class 
org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 679594586 bytes.

After some research it seems to be related to _metadata file written when 
checkpointing/savepointing and this file has grown up amazingly in the past 16h 
from 50MB to more than 600MB if we compare the first ERROR and the last one.

Since then we were unable to launch back the job.

Increasing akka.framesize from 50MB to 1GB permit to avoid the above errors, 
but one task was remaining in CREATING state until failure.
We started to get java.lang.OutOfMemoryError: Java heap space on the 
jobmanager, then timeout between the taskmanagers and jobmanager.
The heap size set to avoid the OOM on the jobmanager was from 2GB to 20GB.
Increasing timeouts lead to other errors, like java.lang.OutOfMemoryError: Java 
heap space on the taskmanagers and so on to finally timeout and fail.

2023-08-03 10:28:32,191 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - The heartbeat 
of ResourceManager with id be6b26eb0a0a54e636c9fbfc5f9815f3 timed out.
2023-08-03 10:28:32,191 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Close 
ResourceManager connection be6b26eb0a0a54e636c9fbfc5f9815f3.
2023-08-03 10:28:32,191 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Connecting to 
ResourceManager 
akka.tcp://flink@10.1.1.3:46899/user/rpc/resourcemanager_0(a6fef33bff489d7e860c1017d2a34f50).
2023-08-03 10:28:38,411 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - The heartbeat 
of JobManager with id 39d49002792d881da6a5e7266c8ee58b timed out.
2023-08-03 10:28:38,412 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Close 
JobManager connection for 

Re: Checkpoint size smaller than Savepoint size

2023-07-19 Thread Shammon FY
Hi Neha,

The HOP window will increase the size of the checkpoint and I'm sorry
that I'm not very familiar with the HOP window.

If the configurations are all right, and you want to confirm if it's a HOP
window issue, I think you can submit a flink job without HOP window but
with regular agg operators, and observe whether the checkpoint and
savepoint meet expectations.

Best,
Shammon FY

On Tue, Jul 18, 2023 at 8:25 PM Neha .  wrote:

> Hi Shammon,
>
> These configs exist in Flink WebUI. We have set
> exeEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); Do
> you think it can create some issues for the HOP(proctime, some interval,
> some interval) and not releasing the state for checkpoints?
> I am really confused about why savepoints are working fine and not
> checkpoints.
>
> On Tue, Jul 18, 2023 at 6:56 AM Shammon FY  wrote:
>
>> Hi Neha,
>>
>> I think you can first check whether the options `state.backend` and
>> `state.backend.incremental` you mentioned above exist in
>> `JobManager`->`Configuration` in Flink webui. If they do not exist, you may
>> be using the wrong conf file.
>>
>> Best,
>> Shammon FY
>>
>>
>> On Mon, Jul 17, 2023 at 5:04 PM Neha .  wrote:
>>
>>> Hi Shammon,
>>>
>>> state.backend: rocksdb
>>> state.backend.incremental: true
>>>
>>> This is already set in the Flink-conf. Anything else that should be
>>> taken care of for the incremental checkpointing? Is there any related bug
>>> in Flink 1.16.1? we have recently migrated to Flink 1.16.1 from Flink
>>> 1.13.6.
>>> What can be the reason for stopped incremental checkpointing?
>>>
>>> On Mon, Jul 17, 2023 at 11:35 AM Shammon FY  wrote:
>>>
 Hi Neha,

 I noticed that the `Checkpointed Data Size` is always equals to `Full
 Checkpoint Data Size`, I think the job is using full checkpoint instead of
 incremental checkpoint,  you can check it

 Best,
 Shammon FY

 On Mon, Jul 17, 2023 at 10:25 AM Neha .  wrote:

> Hello Shammon,
>
> Thank you for your assistance.
> I have already enabled the incremental checkpointing, Attaching the
> screenshot. Can you please elaborate on what makes you think it is not
> enabled, It might hint towards the issue. The problem is checkpoint size 
> is
> not going down and keeps on increasing while savepoint size shows the
> correct behavior of going up and down with the throughput peaks.
>
> [image: Screenshot 2023-07-17 at 7.49.19 AM.png]
>
>
> On Mon, Jul 17, 2023 at 6:28 AM Shammon FY  wrote:
>
>> Hi Neha,
>>
>> I think it is normal for the data size of a savepoint to be smaller
>> than the full data of a checkpoint. Flink uses rocksdb to store
>> checkpointed data, which is an LSM structured storage where the same key
>> will have multiple version records, while savepoint will traverse all 
>> keys
>> and store only one record per key.
>>
>> But I noticed that you did not enable incremental checkpoint, which
>> resulted in each checkpoint saving full data. You can refer to [1] for 
>> more
>> detail and turn it on, which will reduce the data size of the checkpoint.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#incremental-checkpoints
>> 
>>
>> Best,
>> Shammon FY
>>
>>
>> On Sun, Jul 16, 2023 at 2:30 PM Neha .  wrote:
>>
>>> Hello  Shammon FY,
>>>
>>> It is a production issue for me. Can you please take a look if
>>> anything can be done?
>>>
>>> -- Forwarded message -
>>> From: Neha . 
>>> Date: Fri, Jul 14, 2023 at 4:06 PM
>>> Subject: Checkpoint size smaller than Savepoint size
>>> To: 
>>>
>>>
>>> Hello,
>>>
>>> According to Flink's documentation, Checkpoints are designed to be
>>> lightweight. However, in my Flink pipeline, I have observed that the
>>> savepoint sizes are smaller than the checkpoints. Is this expected
>>> behavior? What are the possible scenarios that can lead to this 
>>> situation?
>>>
>>> Additionally, I have noticed that the checkpoint size in my
>>> datastream pipeline continues to grow while the savepoint size behaves 
>>> as
>>> expected. Could this be attributed to the usage of Common Table 
>>> Expressions
>>> (CTEs) in Flink SQL?
>>>
>>> Flink version: 1.16.1
>>> Incremental checkpointing is enabled.
>>> StateBackend: RocksDB
>>> Time Characteristic: Ingestion
>>>
>>> SQL:
>>>
>>> SELECT
>>>   *
>>> from
>>>   (
>>> With Actuals as (
>>>   SELECT
>>> clientOrderId,
>>> Cast

Re: Checkpoint size smaller than Savepoint size

2023-07-18 Thread Neha . via user
Hi Shammon,

These configs exist in Flink WebUI. We have set
exeEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); Do
you think it can create some issues for the HOP(proctime, some interval,
some interval) and not releasing the state for checkpoints?
I am really confused about why savepoints are working fine and not
checkpoints.

On Tue, Jul 18, 2023 at 6:56 AM Shammon FY  wrote:

> Hi Neha,
>
> I think you can first check whether the options `state.backend` and
> `state.backend.incremental` you mentioned above exist in
> `JobManager`->`Configuration` in Flink webui. If they do not exist, you may
> be using the wrong conf file.
>
> Best,
> Shammon FY
>
>
> On Mon, Jul 17, 2023 at 5:04 PM Neha .  wrote:
>
>> Hi Shammon,
>>
>> state.backend: rocksdb
>> state.backend.incremental: true
>>
>> This is already set in the Flink-conf. Anything else that should be taken
>> care of for the incremental checkpointing? Is there any related bug in
>> Flink 1.16.1? we have recently migrated to Flink 1.16.1 from Flink 1.13.6.
>> What can be the reason for stopped incremental checkpointing?
>>
>> On Mon, Jul 17, 2023 at 11:35 AM Shammon FY  wrote:
>>
>>> Hi Neha,
>>>
>>> I noticed that the `Checkpointed Data Size` is always equals to `Full
>>> Checkpoint Data Size`, I think the job is using full checkpoint instead of
>>> incremental checkpoint,  you can check it
>>>
>>> Best,
>>> Shammon FY
>>>
>>> On Mon, Jul 17, 2023 at 10:25 AM Neha .  wrote:
>>>
 Hello Shammon,

 Thank you for your assistance.
 I have already enabled the incremental checkpointing, Attaching the
 screenshot. Can you please elaborate on what makes you think it is not
 enabled, It might hint towards the issue. The problem is checkpoint size is
 not going down and keeps on increasing while savepoint size shows the
 correct behavior of going up and down with the throughput peaks.

 [image: Screenshot 2023-07-17 at 7.49.19 AM.png]


 On Mon, Jul 17, 2023 at 6:28 AM Shammon FY  wrote:

> Hi Neha,
>
> I think it is normal for the data size of a savepoint to be smaller
> than the full data of a checkpoint. Flink uses rocksdb to store
> checkpointed data, which is an LSM structured storage where the same key
> will have multiple version records, while savepoint will traverse all keys
> and store only one record per key.
>
> But I noticed that you did not enable incremental checkpoint, which
> resulted in each checkpoint saving full data. You can refer to [1] for 
> more
> detail and turn it on, which will reduce the data size of the checkpoint.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#incremental-checkpoints
> 
>
> Best,
> Shammon FY
>
>
> On Sun, Jul 16, 2023 at 2:30 PM Neha .  wrote:
>
>> Hello  Shammon FY,
>>
>> It is a production issue for me. Can you please take a look if
>> anything can be done?
>>
>> -- Forwarded message -
>> From: Neha . 
>> Date: Fri, Jul 14, 2023 at 4:06 PM
>> Subject: Checkpoint size smaller than Savepoint size
>> To: 
>>
>>
>> Hello,
>>
>> According to Flink's documentation, Checkpoints are designed to be
>> lightweight. However, in my Flink pipeline, I have observed that the
>> savepoint sizes are smaller than the checkpoints. Is this expected
>> behavior? What are the possible scenarios that can lead to this 
>> situation?
>>
>> Additionally, I have noticed that the checkpoint size in my
>> datastream pipeline continues to grow while the savepoint size behaves as
>> expected. Could this be attributed to the usage of Common Table 
>> Expressions
>> (CTEs) in Flink SQL?
>>
>> Flink version: 1.16.1
>> Incremental checkpointing is enabled.
>> StateBackend: RocksDB
>> Time Characteristic: Ingestion
>>
>> SQL:
>>
>> SELECT
>>   *
>> from
>>   (
>> With Actuals as (
>>   SELECT
>> clientOrderId,
>> Cast(
>>   ValueFromKeyCacheUDF(
>> concat('R_', CAST(order_df.restaurant_id AS VARCHAR))
>>   ) as INT
>> ) as zoneId,
>> cityId,
>> case
>>   when status = 'ASSIGNED' then 1
>>   else 0
>> end as acceptance_flag,
>> unicast.proctime
>>   FROM
>> order
>> INNER JOIN unicast_df ON unicast.clientOrderId =
>> order.order_id
>> AND order.proctime BETWEEN unicast.proctime - interval '70'
>> minute
>> AND unicast.proc

Re: Checkpoint size smaller than Savepoint size

2023-07-17 Thread Shammon FY
Hi Neha,

I think you can first check whether the options `state.backend` and
`state.backend.incremental` you mentioned above exist in
`JobManager`->`Configuration` in Flink webui. If they do not exist, you may
be using the wrong conf file.

Best,
Shammon FY


On Mon, Jul 17, 2023 at 5:04 PM Neha .  wrote:

> Hi Shammon,
>
> state.backend: rocksdb
> state.backend.incremental: true
>
> This is already set in the Flink-conf. Anything else that should be taken
> care of for the incremental checkpointing? Is there any related bug in
> Flink 1.16.1? we have recently migrated to Flink 1.16.1 from Flink 1.13.6.
> What can be the reason for stopped incremental checkpointing?
>
> On Mon, Jul 17, 2023 at 11:35 AM Shammon FY  wrote:
>
>> Hi Neha,
>>
>> I noticed that the `Checkpointed Data Size` is always equals to `Full
>> Checkpoint Data Size`, I think the job is using full checkpoint instead of
>> incremental checkpoint,  you can check it
>>
>> Best,
>> Shammon FY
>>
>> On Mon, Jul 17, 2023 at 10:25 AM Neha .  wrote:
>>
>>> Hello Shammon,
>>>
>>> Thank you for your assistance.
>>> I have already enabled the incremental checkpointing, Attaching the
>>> screenshot. Can you please elaborate on what makes you think it is not
>>> enabled, It might hint towards the issue. The problem is checkpoint size is
>>> not going down and keeps on increasing while savepoint size shows the
>>> correct behavior of going up and down with the throughput peaks.
>>>
>>> [image: Screenshot 2023-07-17 at 7.49.19 AM.png]
>>>
>>>
>>> On Mon, Jul 17, 2023 at 6:28 AM Shammon FY  wrote:
>>>
 Hi Neha,

 I think it is normal for the data size of a savepoint to be smaller
 than the full data of a checkpoint. Flink uses rocksdb to store
 checkpointed data, which is an LSM structured storage where the same key
 will have multiple version records, while savepoint will traverse all keys
 and store only one record per key.

 But I noticed that you did not enable incremental checkpoint, which
 resulted in each checkpoint saving full data. You can refer to [1] for more
 detail and turn it on, which will reduce the data size of the checkpoint.

 [1]
 https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#incremental-checkpoints
 

 Best,
 Shammon FY


 On Sun, Jul 16, 2023 at 2:30 PM Neha .  wrote:

> Hello  Shammon FY,
>
> It is a production issue for me. Can you please take a look if
> anything can be done?
>
> -- Forwarded message -
> From: Neha . 
> Date: Fri, Jul 14, 2023 at 4:06 PM
> Subject: Checkpoint size smaller than Savepoint size
> To: 
>
>
> Hello,
>
> According to Flink's documentation, Checkpoints are designed to be
> lightweight. However, in my Flink pipeline, I have observed that the
> savepoint sizes are smaller than the checkpoints. Is this expected
> behavior? What are the possible scenarios that can lead to this situation?
>
> Additionally, I have noticed that the checkpoint size in my datastream
> pipeline continues to grow while the savepoint size behaves as expected.
> Could this be attributed to the usage of Common Table Expressions (CTEs) 
> in
> Flink SQL?
>
> Flink version: 1.16.1
> Incremental checkpointing is enabled.
> StateBackend: RocksDB
> Time Characteristic: Ingestion
>
> SQL:
>
> SELECT
>   *
> from
>   (
> With Actuals as (
>   SELECT
> clientOrderId,
> Cast(
>   ValueFromKeyCacheUDF(
> concat('R_', CAST(order_df.restaurant_id AS VARCHAR))
>   ) as INT
> ) as zoneId,
> cityId,
> case
>   when status = 'ASSIGNED' then 1
>   else 0
> end as acceptance_flag,
> unicast.proctime
>   FROM
> order
> INNER JOIN unicast_df ON unicast.clientOrderId = order.order_id
> AND order.proctime BETWEEN unicast.proctime - interval '70'
> minute
> AND unicast.proctime + interval '10' minute
> and unicast.status in ('ASSIGNED', 'REJECTED')
> ),
> zone_agg as (
>   select
> zoneId,
> (sum(acceptance_flag) * 1.0) / count(*) as `zone_quotient`,
> avg(cityId) as cityId,
> COUNT(*) as `unicast_count`,
> proctime() as proctime
>   from
> Actuals
>   group by
> HOP(
>   proctime(),
>   interval '5' minute,
>   interval '30' minute
> ),
>>

Re: Checkpoint size smaller than Savepoint size

2023-07-17 Thread Neha . via user
Hi Shammon,

state.backend: rocksdb
state.backend.incremental: true

This is already set in the Flink-conf. Anything else that should be taken
care of for the incremental checkpointing? Is there any related bug in
Flink 1.16.1? we have recently migrated to Flink 1.16.1 from Flink 1.13.6.
What can be the reason for stopped incremental checkpointing?

On Mon, Jul 17, 2023 at 11:35 AM Shammon FY  wrote:

> Hi Neha,
>
> I noticed that the `Checkpointed Data Size` is always equals to `Full
> Checkpoint Data Size`, I think the job is using full checkpoint instead of
> incremental checkpoint,  you can check it
>
> Best,
> Shammon FY
>
> On Mon, Jul 17, 2023 at 10:25 AM Neha .  wrote:
>
>> Hello Shammon,
>>
>> Thank you for your assistance.
>> I have already enabled the incremental checkpointing, Attaching the
>> screenshot. Can you please elaborate on what makes you think it is not
>> enabled, It might hint towards the issue. The problem is checkpoint size is
>> not going down and keeps on increasing while savepoint size shows the
>> correct behavior of going up and down with the throughput peaks.
>>
>> [image: Screenshot 2023-07-17 at 7.49.19 AM.png]
>>
>>
>> On Mon, Jul 17, 2023 at 6:28 AM Shammon FY  wrote:
>>
>>> Hi Neha,
>>>
>>> I think it is normal for the data size of a savepoint to be smaller than
>>> the full data of a checkpoint. Flink uses rocksdb to store checkpointed
>>> data, which is an LSM structured storage where the same key will have
>>> multiple version records, while savepoint will traverse all keys and store
>>> only one record per key.
>>>
>>> But I noticed that you did not enable incremental checkpoint, which
>>> resulted in each checkpoint saving full data. You can refer to [1] for more
>>> detail and turn it on, which will reduce the data size of the checkpoint.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#incremental-checkpoints
>>> 
>>>
>>> Best,
>>> Shammon FY
>>>
>>>
>>> On Sun, Jul 16, 2023 at 2:30 PM Neha .  wrote:
>>>
 Hello  Shammon FY,

 It is a production issue for me. Can you please take a look if anything
 can be done?

 -- Forwarded message -
 From: Neha . 
 Date: Fri, Jul 14, 2023 at 4:06 PM
 Subject: Checkpoint size smaller than Savepoint size
 To: 


 Hello,

 According to Flink's documentation, Checkpoints are designed to be
 lightweight. However, in my Flink pipeline, I have observed that the
 savepoint sizes are smaller than the checkpoints. Is this expected
 behavior? What are the possible scenarios that can lead to this situation?

 Additionally, I have noticed that the checkpoint size in my datastream
 pipeline continues to grow while the savepoint size behaves as expected.
 Could this be attributed to the usage of Common Table Expressions (CTEs) in
 Flink SQL?

 Flink version: 1.16.1
 Incremental checkpointing is enabled.
 StateBackend: RocksDB
 Time Characteristic: Ingestion

 SQL:

 SELECT
   *
 from
   (
 With Actuals as (
   SELECT
 clientOrderId,
 Cast(
   ValueFromKeyCacheUDF(
 concat('R_', CAST(order_df.restaurant_id AS VARCHAR))
   ) as INT
 ) as zoneId,
 cityId,
 case
   when status = 'ASSIGNED' then 1
   else 0
 end as acceptance_flag,
 unicast.proctime
   FROM
 order
 INNER JOIN unicast_df ON unicast.clientOrderId = order.order_id
 AND order.proctime BETWEEN unicast.proctime - interval '70'
 minute
 AND unicast.proctime + interval '10' minute
 and unicast.status in ('ASSIGNED', 'REJECTED')
 ),
 zone_agg as (
   select
 zoneId,
 (sum(acceptance_flag) * 1.0) / count(*) as `zone_quotient`,
 avg(cityId) as cityId,
 COUNT(*) as `unicast_count`,
 proctime() as proctime
   from
 Actuals
   group by
 HOP(
   proctime(),
   interval '5' minute,
   interval '30' minute
 ),
 zoneId
 ),
 city_agg as(
   select
 cityId,
 sum(acceptance_flag) * 1.0 / count(*) as `city_quotient`,
 proctime() as proctime
   from
 Actuals
   group by
 HOP(
   proctime(),
   interval '5' minute,
   interval '30' minute
 ),
 cityId
 ),
 final as (
   sele

Re: Checkpoint size smaller than Savepoint size

2023-07-16 Thread Shammon FY
Hi Neha,

I noticed that the `Checkpointed Data Size` is always equals to `Full
Checkpoint Data Size`, I think the job is using full checkpoint instead of
incremental checkpoint,  you can check it

Best,
Shammon FY

On Mon, Jul 17, 2023 at 10:25 AM Neha .  wrote:

> Hello Shammon,
>
> Thank you for your assistance.
> I have already enabled the incremental checkpointing, Attaching the
> screenshot. Can you please elaborate on what makes you think it is not
> enabled, It might hint towards the issue. The problem is checkpoint size is
> not going down and keeps on increasing while savepoint size shows the
> correct behavior of going up and down with the throughput peaks.
>
> [image: Screenshot 2023-07-17 at 7.49.19 AM.png]
>
>
> On Mon, Jul 17, 2023 at 6:28 AM Shammon FY  wrote:
>
>> Hi Neha,
>>
>> I think it is normal for the data size of a savepoint to be smaller than
>> the full data of a checkpoint. Flink uses rocksdb to store checkpointed
>> data, which is an LSM structured storage where the same key will have
>> multiple version records, while savepoint will traverse all keys and store
>> only one record per key.
>>
>> But I noticed that you did not enable incremental checkpoint, which
>> resulted in each checkpoint saving full data. You can refer to [1] for more
>> detail and turn it on, which will reduce the data size of the checkpoint.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#incremental-checkpoints
>> 
>>
>> Best,
>> Shammon FY
>>
>>
>> On Sun, Jul 16, 2023 at 2:30 PM Neha .  wrote:
>>
>>> Hello  Shammon FY,
>>>
>>> It is a production issue for me. Can you please take a look if anything
>>> can be done?
>>>
>>> -- Forwarded message -
>>> From: Neha . 
>>> Date: Fri, Jul 14, 2023 at 4:06 PM
>>> Subject: Checkpoint size smaller than Savepoint size
>>> To: 
>>>
>>>
>>> Hello,
>>>
>>> According to Flink's documentation, Checkpoints are designed to be
>>> lightweight. However, in my Flink pipeline, I have observed that the
>>> savepoint sizes are smaller than the checkpoints. Is this expected
>>> behavior? What are the possible scenarios that can lead to this situation?
>>>
>>> Additionally, I have noticed that the checkpoint size in my datastream
>>> pipeline continues to grow while the savepoint size behaves as expected.
>>> Could this be attributed to the usage of Common Table Expressions (CTEs) in
>>> Flink SQL?
>>>
>>> Flink version: 1.16.1
>>> Incremental checkpointing is enabled.
>>> StateBackend: RocksDB
>>> Time Characteristic: Ingestion
>>>
>>> SQL:
>>>
>>> SELECT
>>>   *
>>> from
>>>   (
>>> With Actuals as (
>>>   SELECT
>>> clientOrderId,
>>> Cast(
>>>   ValueFromKeyCacheUDF(
>>> concat('R_', CAST(order_df.restaurant_id AS VARCHAR))
>>>   ) as INT
>>> ) as zoneId,
>>> cityId,
>>> case
>>>   when status = 'ASSIGNED' then 1
>>>   else 0
>>> end as acceptance_flag,
>>> unicast.proctime
>>>   FROM
>>> order
>>> INNER JOIN unicast_df ON unicast.clientOrderId = order.order_id
>>> AND order.proctime BETWEEN unicast.proctime - interval '70'
>>> minute
>>> AND unicast.proctime + interval '10' minute
>>> and unicast.status in ('ASSIGNED', 'REJECTED')
>>> ),
>>> zone_agg as (
>>>   select
>>> zoneId,
>>> (sum(acceptance_flag) * 1.0) / count(*) as `zone_quotient`,
>>> avg(cityId) as cityId,
>>> COUNT(*) as `unicast_count`,
>>> proctime() as proctime
>>>   from
>>> Actuals
>>>   group by
>>> HOP(
>>>   proctime(),
>>>   interval '5' minute,
>>>   interval '30' minute
>>> ),
>>> zoneId
>>> ),
>>> city_agg as(
>>>   select
>>> cityId,
>>> sum(acceptance_flag) * 1.0 / count(*) as `city_quotient`,
>>> proctime() as proctime
>>>   from
>>> Actuals
>>>   group by
>>> HOP(
>>>   proctime(),
>>>   interval '5' minute,
>>>   interval '30' minute
>>> ),
>>> cityId
>>> ),
>>> final as (
>>>   select
>>> zone_agg.zoneId,
>>> zone_agg.cityId,
>>> avg(zone_agg.unicast_count) as unicast_count,
>>> avg(zone_agg.zone_quotient) as zone_quotient,
>>> avg(city_agg.city_quotient) as city_quotient
>>>   from
>>> city_agg
>>> INNER join zone_agg on zone_agg.cityId = city_agg.cityId
>>> AND city_agg.proctime BETWEEN zone_agg.proctime - interval '60'
>>> minute
>>> AND zone_agg.proctime
>>>   group by
>>> HOP(
>>>   proctime(),
>>>   interval '5' m

Re: Checkpoint size smaller than Savepoint size

2023-07-16 Thread Neha . via user
Hello Shammon,

Thank you for your assistance.
I have already enabled the incremental checkpointing, Attaching the
screenshot. Can you please elaborate on what makes you think it is not
enabled, It might hint towards the issue. The problem is checkpoint size is
not going down and keeps on increasing while savepoint size shows the
correct behavior of going up and down with the throughput peaks.

[image: Screenshot 2023-07-17 at 7.49.19 AM.png]


On Mon, Jul 17, 2023 at 6:28 AM Shammon FY  wrote:

> Hi Neha,
>
> I think it is normal for the data size of a savepoint to be smaller than
> the full data of a checkpoint. Flink uses rocksdb to store checkpointed
> data, which is an LSM structured storage where the same key will have
> multiple version records, while savepoint will traverse all keys and store
> only one record per key.
>
> But I noticed that you did not enable incremental checkpoint, which
> resulted in each checkpoint saving full data. You can refer to [1] for more
> detail and turn it on, which will reduce the data size of the checkpoint.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#incremental-checkpoints
> 
>
> Best,
> Shammon FY
>
>
> On Sun, Jul 16, 2023 at 2:30 PM Neha .  wrote:
>
>> Hello  Shammon FY,
>>
>> It is a production issue for me. Can you please take a look if anything
>> can be done?
>>
>> -- Forwarded message -
>> From: Neha . 
>> Date: Fri, Jul 14, 2023 at 4:06 PM
>> Subject: Checkpoint size smaller than Savepoint size
>> To: 
>>
>>
>> Hello,
>>
>> According to Flink's documentation, Checkpoints are designed to be
>> lightweight. However, in my Flink pipeline, I have observed that the
>> savepoint sizes are smaller than the checkpoints. Is this expected
>> behavior? What are the possible scenarios that can lead to this situation?
>>
>> Additionally, I have noticed that the checkpoint size in my datastream
>> pipeline continues to grow while the savepoint size behaves as expected.
>> Could this be attributed to the usage of Common Table Expressions (CTEs) in
>> Flink SQL?
>>
>> Flink version: 1.16.1
>> Incremental checkpointing is enabled.
>> StateBackend: RocksDB
>> Time Characteristic: Ingestion
>>
>> SQL:
>>
>> SELECT
>>   *
>> from
>>   (
>> With Actuals as (
>>   SELECT
>> clientOrderId,
>> Cast(
>>   ValueFromKeyCacheUDF(
>> concat('R_', CAST(order_df.restaurant_id AS VARCHAR))
>>   ) as INT
>> ) as zoneId,
>> cityId,
>> case
>>   when status = 'ASSIGNED' then 1
>>   else 0
>> end as acceptance_flag,
>> unicast.proctime
>>   FROM
>> order
>> INNER JOIN unicast_df ON unicast.clientOrderId = order.order_id
>> AND order.proctime BETWEEN unicast.proctime - interval '70' minute
>> AND unicast.proctime + interval '10' minute
>> and unicast.status in ('ASSIGNED', 'REJECTED')
>> ),
>> zone_agg as (
>>   select
>> zoneId,
>> (sum(acceptance_flag) * 1.0) / count(*) as `zone_quotient`,
>> avg(cityId) as cityId,
>> COUNT(*) as `unicast_count`,
>> proctime() as proctime
>>   from
>> Actuals
>>   group by
>> HOP(
>>   proctime(),
>>   interval '5' minute,
>>   interval '30' minute
>> ),
>> zoneId
>> ),
>> city_agg as(
>>   select
>> cityId,
>> sum(acceptance_flag) * 1.0 / count(*) as `city_quotient`,
>> proctime() as proctime
>>   from
>> Actuals
>>   group by
>> HOP(
>>   proctime(),
>>   interval '5' minute,
>>   interval '30' minute
>> ),
>> cityId
>> ),
>> final as (
>>   select
>> zone_agg.zoneId,
>> zone_agg.cityId,
>> avg(zone_agg.unicast_count) as unicast_count,
>> avg(zone_agg.zone_quotient) as zone_quotient,
>> avg(city_agg.city_quotient) as city_quotient
>>   from
>> city_agg
>> INNER join zone_agg on zone_agg.cityId = city_agg.cityId
>> AND city_agg.proctime BETWEEN zone_agg.proctime - interval '60'
>> minute
>> AND zone_agg.proctime
>>   group by
>> HOP(
>>   proctime(),
>>   interval '5' minute,
>>   interval '30' minute
>> ),
>> zone_agg.zoneId,
>> zone_agg.cityId
>> ),
>> new_final as (
>>   select
>> 'zoneid_de_acceptance_rate#' || cast(zoneId as varchar) as key,
>> zone_quotient,
>> city_quotient,
>> case
>>   when unicast_count > 5 then zone_quotient
>>   else city_quotient
>> end as `value`
>>   

Re: Checkpoint size smaller than Savepoint size

2023-07-16 Thread Shammon FY
Hi Neha,

I think it is normal for the data size of a savepoint to be smaller than
the full data of a checkpoint. Flink uses rocksdb to store checkpointed
data, which is an LSM structured storage where the same key will have
multiple version records, while savepoint will traverse all keys and store
only one record per key.

But I noticed that you did not enable incremental checkpoint, which
resulted in each checkpoint saving full data. You can refer to [1] for more
detail and turn it on, which will reduce the data size of the checkpoint.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#incremental-checkpoints

Best,
Shammon FY


On Sun, Jul 16, 2023 at 2:30 PM Neha .  wrote:

> Hello  Shammon FY,
>
> It is a production issue for me. Can you please take a look if anything
> can be done?
>
> -- Forwarded message -
> From: Neha . 
> Date: Fri, Jul 14, 2023 at 4:06 PM
> Subject: Checkpoint size smaller than Savepoint size
> To: 
>
>
> Hello,
>
> According to Flink's documentation, Checkpoints are designed to be
> lightweight. However, in my Flink pipeline, I have observed that the
> savepoint sizes are smaller than the checkpoints. Is this expected
> behavior? What are the possible scenarios that can lead to this situation?
>
> Additionally, I have noticed that the checkpoint size in my datastream
> pipeline continues to grow while the savepoint size behaves as expected.
> Could this be attributed to the usage of Common Table Expressions (CTEs) in
> Flink SQL?
>
> Flink version: 1.16.1
> Incremental checkpointing is enabled.
> StateBackend: RocksDB
> Time Characteristic: Ingestion
>
> SQL:
>
> SELECT
>   *
> from
>   (
> With Actuals as (
>   SELECT
> clientOrderId,
> Cast(
>   ValueFromKeyCacheUDF(
> concat('R_', CAST(order_df.restaurant_id AS VARCHAR))
>   ) as INT
> ) as zoneId,
> cityId,
> case
>   when status = 'ASSIGNED' then 1
>   else 0
> end as acceptance_flag,
> unicast.proctime
>   FROM
> order
> INNER JOIN unicast_df ON unicast.clientOrderId = order.order_id
> AND order.proctime BETWEEN unicast.proctime - interval '70' minute
> AND unicast.proctime + interval '10' minute
> and unicast.status in ('ASSIGNED', 'REJECTED')
> ),
> zone_agg as (
>   select
> zoneId,
> (sum(acceptance_flag) * 1.0) / count(*) as `zone_quotient`,
> avg(cityId) as cityId,
> COUNT(*) as `unicast_count`,
> proctime() as proctime
>   from
> Actuals
>   group by
> HOP(
>   proctime(),
>   interval '5' minute,
>   interval '30' minute
> ),
> zoneId
> ),
> city_agg as(
>   select
> cityId,
> sum(acceptance_flag) * 1.0 / count(*) as `city_quotient`,
> proctime() as proctime
>   from
> Actuals
>   group by
> HOP(
>   proctime(),
>   interval '5' minute,
>   interval '30' minute
> ),
> cityId
> ),
> final as (
>   select
> zone_agg.zoneId,
> zone_agg.cityId,
> avg(zone_agg.unicast_count) as unicast_count,
> avg(zone_agg.zone_quotient) as zone_quotient,
> avg(city_agg.city_quotient) as city_quotient
>   from
> city_agg
> INNER join zone_agg on zone_agg.cityId = city_agg.cityId
> AND city_agg.proctime BETWEEN zone_agg.proctime - interval '60'
> minute
> AND zone_agg.proctime
>   group by
> HOP(
>   proctime(),
>   interval '5' minute,
>   interval '30' minute
> ),
> zone_agg.zoneId,
> zone_agg.cityId
> ),
> new_final as (
>   select
> 'zoneid_de_acceptance_rate#' || cast(zoneId as varchar) as key,
> zone_quotient,
> city_quotient,
> case
>   when unicast_count > 5 then zone_quotient
>   else city_quotient
> end as `value`
>   from
> final
> )
> select
>   key,
>   case
> when new_final.`value` > 1 then 1
> else new_final.`value`
>   end as `value`,
>   zone_quotient,
>   city_quotient
> from
>   new_final
>   )
>
>
>
>
> --
> IMPORTANT NOTICE:  The contents of this email and any attachments are
> confidential in nature and intended solely for the addressee, and are
> subject to the terms and conditions of disclosure as further described
> here: https://www.scd.swiggy.in/nda. If you are not the intended
> recipient or you do not agree to the terms and conditions of disclosure,
> please delete this email immediately, and notify the sender by return
> email. In the event that you continue to access the information herein or
> act upon it in any manner, the terms and conditions shall be deemed
> accepted by you.


RE: Checkpoint directories not cleared as TaskManagers run

2022-05-19 Thread Schwalbe Matthias
Hi James,

Let me give some short answers, (there is documentation that better describes 
this):

>> - why do taskmanagers create the chk-x directory but only the jobmanager can 
>> delete it? Shouldn’t the jobmanager be the only component creating and 
>> deleting these directories? That would seem more consistent to me but maybe 
>> there is a reason.

  *   Assuming proper setup, i.e. checkpoint directory is on a shared folder
  *   Tasks and state thereof are split as subtasks to separate slots 
(according to parallelism)
  *   When checkpoints are written each state primitive on each resp. subtask 
writes its portion of state to the checkpoint folder and forwards the filename 
to the job manager
  *   For incremental checkpoints some files also remain in older checkpoint 
folders until obsolete
  *   This process is managed by jobmanager
  *   In the end of each checkpoint, jobmanager writes _metadata file to the 
resp. checkpoint folder containing (simplified) the filenames of respective 
states and small state
  *   When a new checkpoint is finished, jobmanager decides according to 
configuration which old checkpoint files become obsolete and hence deleted
  *   In general checkpoints and savepoints are for high availability purposes, 
if the checkpoint data were on a local folder of machine that crashed it would 
not be available for restart of the job
  *   The parts that should be on a local (and fast) drive are the ones used by 
RocksDB, these are ephermeral and can (and will) be recreated on job recovery
>>  - I see many files under each chk-x folder. Can anyone confirm if each file 
>> is wholly owned by a single task manager? ie is each file only written by 1 
>> TM? Otherwise there could be file locking and contention.

  *   Mostly explained above … however
  *   If two taskmanagers happen to be started on the same machine (uncommon 
for k8s, common for Yarn resource manager) they would use the same folder
  *   Filenames contain a uuid which is unlikely to collide
>> - we are now looking to add in NFS mounts for our containers so all the job 
>> managers and taskmanagers share the same path. Can anyone confirm if NFS is 
>> a ‘reliable’ storage mechanism as we have heard many stories how problematic 
>> it can be. We are not yet able to use HDFS or S3.

  *   NFS is not reliable, probably not fit for PROD purposes, don’t know about 
some NAS setup that uses NFS and has integrated reliability …
>> - if Flink can not write to NFS my understanding is although the checkpoint 
>> will fail the Flink process will carry on and try again at the next 
>> checkpoint. It will not cause my program to fail correct?

  *   Imho there would be no reason to setup checkpointing in the first place, 
if you cannot restart a job from such checkpoint
  *   This is only important, of course, if you need reliability, or exactly 
once semantics …

Thias

From: James Sandys-Lumsdaine 
Sent: Wednesday, May 18, 2022 2:53 PM
To: Schwalbe Matthias 
Cc: Hangxiang Yu ; user@flink.apache.org
Subject: Re: Checkpoint directories not cleared as TaskManagers run

Hello Matthias,

Thanks for your reply. Yes indeed your are correct. My /tmp path is private so 
you have confirmed what I thought was happening.

I have some follow up questions:
- why do taskmanagers create the chk-x directory but only the jobmanager can 
delete it? Shouldn’t the jobmanager be the only component creating and deleting 
these directories? That would seem more consistent to me but maybe there is a 
reason.
- I see many files under each chk-x folder. Can anyone confirm if each file is 
wholly owned by a single task manager? ie is each file only written by 1 TM? 
Otherwise there could be file locking and contention.
- we are now looking to add in NFS mounts for our containers so all the job 
managers and taskmanagers share the same path. Can anyone confirm if NFS is a 
‘reliable’ storage mechanism as we have heard many stories how problematic it 
can be. We are not yet able to use HDFS or S3.
- if Flink can not write to NFS my understanding is although the checkpoint 
will fail the Flink process will carry on and try again at the next checkpoint. 
It will not cause my program to fail correct?

Many thanks again,

James.

Sent from my iPhone


On 17 May 2022, at 15:17, Schwalbe Matthias 
mailto:matthias.schwa...@viseca.ch>> wrote:

Hi James,

From reading the thread … I assume, your file:/tmp/Flink/State folder is not 
shared across all machines, right?

In this case it cannot work:
- checkpoints and savepoints need to go to a path that can be commonly accessed 
by jobmanager and all taskmanagers in order to work
- as your jobmanager can not access the checkpoint files of it can also not 
clean-up those files

Hope that helps

Regards

Thias

From: James Sandys-Lumsdaine mailto:jas...@hotmail.com>>
Sent: Tuesday, May 17, 2022 3:55 PM
To: Hangxiang Y

Re: Checkpoint directories not cleared as TaskManagers run

2022-05-18 Thread James Sandys-Lumsdaine
Hello Matthias,

Thanks for your reply. Yes indeed your are correct. My /tmp path is private so 
you have confirmed what I thought was happening.

I have some follow up questions:
- why do taskmanagers create the chk-x directory but only the jobmanager can 
delete it? Shouldn’t the jobmanager be the only component creating and deleting 
these directories? That would seem more consistent to me but maybe there is a 
reason.
- I see many files under each chk-x folder. Can anyone confirm if each file is 
wholly owned by a single task manager? ie is each file only written by 1 TM? 
Otherwise there could be file locking and contention.
- we are now looking to add in NFS mounts for our containers so all the job 
managers and taskmanagers share the same path. Can anyone confirm if NFS is a 
‘reliable’ storage mechanism as we have heard many stories how problematic it 
can be. We are not yet able to use HDFS or S3.
- if Flink can not write to NFS my understanding is although the checkpoint 
will fail the Flink process will carry on and try again at the next checkpoint. 
It will not cause my program to fail correct?

Many thanks again,

James.

Sent from my iPhone

On 17 May 2022, at 15:17, Schwalbe Matthias  wrote:


Hi James,

From reading the thread … I assume, your file:/tmp/Flink/State folder is not 
shared across all machines, right?

In this case it cannot work:
- checkpoints and savepoints need to go to a path that can be commonly accessed 
by jobmanager and all taskmanagers in order to work
- as your jobmanager can not access the checkpoint files of it can also not 
clean-up those files

Hope that helps

Regards

Thias

From: James Sandys-Lumsdaine 
Sent: Tuesday, May 17, 2022 3:55 PM
To: Hangxiang Yu ; user@flink.apache.org
Subject: Re: Checkpoint directories not cleared as TaskManagers run

Thanks for your replay.

To be clear on my setup with the problem:

  *   4 taskmanagers running across different containers and machines. Each 
container has its own filesystem including / and /tmp.
  *   1 jobmanager also running in its own container and machine. Also has its 
own filesystem.
  *   I have configured the FS checkpoint address to be "file:/tmp/Flink/State" 
- therefore each process (JM and TMs) are reading and writing to their own 
/tmp. i.e. there is no shared access like if it was NFS or HDFS.
So when the checkpointing happens the directories are created and populated but 
only the JM's old checkpoint directories and cleaned up. Each of the TM 
/tmp/Flink/State old "chk-x" directories remain and are not cleared up.

From your email I don't know if you think I am writing to a "shared" path or 
not?

I started looking at the in memory checkpoint storage but this has a max size 
with an int so can't have for 5GB of state. I need the checkpointing to trigger 
my sinks to persist (GenericWriteAheadSink) so it seem I have​ to create a 
proper shared file path all my containers can access.

James.

From: Hangxiang Yu mailto:master...@gmail.com>>
Sent: 17 May 2022 14:38
To: James Sandys-Lumsdaine mailto:jas...@hotmail.com>>; 
user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Re: Checkpoint directories not cleared as TaskManagers run

Hi, James.
I may not get what the problem is.
All checkpoints will store in the address as you set.
IIUC, TMs will write some checkpoint info in their local dir and then upload 
them to the address and then delete local one.
JM will write some metas of checkpoint to the address and also do the entire 
deletion for checkpoints.
Best,
Hangxiang.

On Tue, May 17, 2022 at 9:09 PM James Sandys-Lumsdaine 
mailto:jas...@hotmail.com>> wrote:
Some further Googling says on a StackOverflow posting it is the jobmanager that 
does the deletion and not the taskmanagers.

Currently my taskmanagers are writing their checkpoints to their own private 
disks (/tmp) rather than a share - so my suspicion is the jobmanager can't 
access the folder on other machine. I thought the jobmanagers could clear up 
their own state when instructed to by the jobmanager.

I can not yet use an nfs mount in my deployment so I may have to switch to heap 
checkpoint state instead of using the file storage checkpoint system. Now I 
understand what's going on a bit better it seems pointless for me to have file 
checkpoints that can't be read by the jobmanager for failover.

If anyone can clarify/correct me I would appreciate.

James.

From: James Sandys-Lumsdaine
Sent: 16 May 2022 18:52
To: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Checkpoint directories not cleared as TaskManagers run


Hello,



I'm seeing my Flink deployment's checkpoint storage directories build up and 
never clear down.



When I run from my own IDE, I see the only the latest &qu

RE: Checkpoint directories not cleared as TaskManagers run

2022-05-17 Thread Schwalbe Matthias
Hi James,

From reading the thread … I assume, your file:/tmp/Flink/State folder is not 
shared across all machines, right?

In this case it cannot work:
- checkpoints and savepoints need to go to a path that can be commonly accessed 
by jobmanager and all taskmanagers in order to work
- as your jobmanager can not access the checkpoint files of it can also not 
clean-up those files

Hope that helps

Regards

Thias

From: James Sandys-Lumsdaine 
Sent: Tuesday, May 17, 2022 3:55 PM
To: Hangxiang Yu ; user@flink.apache.org
Subject: Re: Checkpoint directories not cleared as TaskManagers run

Thanks for your replay.

To be clear on my setup with the problem:

  *   4 taskmanagers running across different containers and machines. Each 
container has its own filesystem including / and /tmp.
  *   1 jobmanager also running in its own container and machine. Also has its 
own filesystem.
  *   I have configured the FS checkpoint address to be "file:/tmp/Flink/State" 
- therefore each process (JM and TMs) are reading and writing to their own 
/tmp. i.e. there is no shared access like if it was NFS or HDFS.
So when the checkpointing happens the directories are created and populated but 
only the JM's old checkpoint directories and cleaned up. Each of the TM 
/tmp/Flink/State old "chk-x" directories remain and are not cleared up.

From your email I don't know if you think I am writing to a "shared" path or 
not?

I started looking at the in memory checkpoint storage but this has a max size 
with an int so can't have for 5GB of state. I need the checkpointing to trigger 
my sinks to persist (GenericWriteAheadSink) so it seem I have​ to create a 
proper shared file path all my containers can access.

James.

From: Hangxiang Yu mailto:master...@gmail.com>>
Sent: 17 May 2022 14:38
To: James Sandys-Lumsdaine mailto:jas...@hotmail.com>>; 
user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Re: Checkpoint directories not cleared as TaskManagers run

Hi, James.
I may not get what the problem is.
All checkpoints will store in the address as you set.
IIUC, TMs will write some checkpoint info in their local dir and then upload 
them to the address and then delete local one.
JM will write some metas of checkpoint to the address and also do the entire 
deletion for checkpoints.
Best,
Hangxiang.

On Tue, May 17, 2022 at 9:09 PM James Sandys-Lumsdaine 
mailto:jas...@hotmail.com>> wrote:
Some further Googling says on a StackOverflow posting it is the jobmanager that 
does the deletion and not the taskmanagers.

Currently my taskmanagers are writing their checkpoints to their own private 
disks (/tmp) rather than a share - so my suspicion is the jobmanager can't 
access the folder on other machine. I thought the jobmanagers could clear up 
their own state when instructed to by the jobmanager.

I can not yet use an nfs mount in my deployment so I may have to switch to heap 
checkpoint state instead of using the file storage checkpoint system. Now I 
understand what's going on a bit better it seems pointless for me to have file 
checkpoints that can't be read by the jobmanager for failover.

If anyone can clarify/correct me I would appreciate.

James.

From: James Sandys-Lumsdaine
Sent: 16 May 2022 18:52
To: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Checkpoint directories not cleared as TaskManagers run


Hello,



I'm seeing my Flink deployment's checkpoint storage directories build up and 
never clear down.



When I run from my own IDE, I see the only the latest "chk-x" directory under 
the job id folder. So the first checkpoint is "chk-1", which is then replaced 
with "chk-2" etc.



However, when I run as a proper application mode deployment, each of the 4 
taskmanagers running in their own containers retain every one of the "chk-x" 
directories meaning they eat a lot of disk space after as time progresses. 
Interestingly, the jobmanager itself is fine.



Does anyone have any suggestion on how to debug this? Anything obvious that 
would cause such behaviour? I'm currently using Flink 1.14.0.



My set up is essentially below (trimmed for simplicity):

   Configuration conf = new Configuration();

conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);


conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true);

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);



env.enableCheckpointing(5 * 1000);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10 * 1000);



env.setStateBackend(new HashMapStateBackend());

env.getCheckpointConfig().setCheckpointStorage("f

Re: Checkpoint directories not cleared as TaskManagers run

2022-05-17 Thread James Sandys-Lumsdaine
Thanks for your replay.

To be clear on my setup with the problem:

  *   4 taskmanagers running across different containers and machines. Each 
container has its own filesystem including / and /tmp.
  *   1 jobmanager also running in its own container and machine. Also has its 
own filesystem.
  *   I have configured the FS checkpoint address to be "file:/tmp/Flink/State" 
- therefore each process (JM and TMs) are reading and writing to their own 
/tmp. i.e. there is no shared access like if it was NFS or HDFS.

So when the checkpointing happens the directories are created and populated but 
only the JM's old checkpoint directories and cleaned up. Each of the TM 
/tmp/Flink/State old "chk-x" directories remain and are not cleared up.

From your email I don't know if you think I am writing to a "shared" path or 
not?

I started looking at the in memory checkpoint storage but this has a max size 
with an int so can't have for 5GB of state. I need the checkpointing to trigger 
my sinks to persist (GenericWriteAheadSink) so it seem I have​ to create a 
proper shared file path all my containers can access.

James.

From: Hangxiang Yu 
Sent: 17 May 2022 14:38
To: James Sandys-Lumsdaine ; user@flink.apache.org 

Subject: Re: Checkpoint directories not cleared as TaskManagers run

Hi, James.
I may not get what the problem is.
All checkpoints will store in the address as you set.
IIUC, TMs will write some checkpoint info in their local dir and then upload 
them to the address and then delete local one.
JM will write some metas of checkpoint to the address and also do the entire 
deletion for checkpoints.

Best,
Hangxiang.

On Tue, May 17, 2022 at 9:09 PM James Sandys-Lumsdaine 
mailto:jas...@hotmail.com>> wrote:
Some further Googling says on a StackOverflow posting it is the jobmanager that 
does the deletion and not the taskmanagers.

Currently my taskmanagers are writing their checkpoints to their own private 
disks (/tmp) rather than a share - so my suspicion is the jobmanager can't 
access the folder on other machine. I thought the jobmanagers could clear up 
their own state when instructed to by the jobmanager.

I can not yet use an nfs mount in my deployment so I may have to switch to heap 
checkpoint state instead of using the file storage checkpoint system. Now I 
understand what's going on a bit better it seems pointless for me to have file 
checkpoints that can't be read by the jobmanager for failover.

If anyone can clarify/correct me I would appreciate.

James.

From: James Sandys-Lumsdaine
Sent: 16 May 2022 18:52
To: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Checkpoint directories not cleared as TaskManagers run


Hello,


I'm seeing my Flink deployment's checkpoint storage directories build up and 
never clear down.


When I run from my own IDE, I see the only the latest "chk-x" directory under 
the job id folder. So the first checkpoint is "chk-1", which is then replaced 
with "chk-2" etc.


However, when I run as a proper application mode deployment, each of the 4 
taskmanagers running in their own containers retain every one of the "chk-x" 
directories meaning they eat a lot of disk space after as time progresses. 
Interestingly, the jobmanager itself is fine.


Does anyone have any suggestion on how to debug this? Anything obvious that 
would cause such behaviour? I'm currently using Flink 1.14.0.


My set up is essentially below (trimmed for simplicity):

   Configuration conf = new Configuration();

conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);


conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true);

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);



env.enableCheckpointing(5 * 1000);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10 * 1000);



env.setStateBackend(new HashMapStateBackend());

env.getCheckpointConfig().setCheckpointStorage("file:/tmp/Flink/State");


Thanks in advance,

James.



Re: Checkpoint directories not cleared as TaskManagers run

2022-05-17 Thread Hangxiang Yu
Hi, James.
I may not get what the problem is.
All checkpoints will store in the address as you set.
IIUC, TMs will write some checkpoint info in their local dir and then
upload them to the address and then delete local one.
JM will write some metas of checkpoint to the address and also do the
entire deletion for checkpoints.

Best,
Hangxiang.

On Tue, May 17, 2022 at 9:09 PM James Sandys-Lumsdaine 
wrote:

> Some further Googling says on a StackOverflow posting it is the jobmanager
> that does the deletion and not the taskmanagers.
>
> Currently my taskmanagers are writing their checkpoints to their own
> private disks (/tmp) rather than a share - so my suspicion is the
> jobmanager can't access the folder on other machine. I thought the
> jobmanagers could clear up their own state when instructed to by the
> jobmanager.
>
> I can not yet use an nfs mount in my deployment so I may have to switch to
> heap checkpoint state instead of using the file storage checkpoint system.
> Now I understand what's going on a bit better it seems pointless for me to
> have file checkpoints that can't be read by the jobmanager for failover.
>
> If anyone can clarify/correct me I would appreciate.
>
> James.
> --
> *From:* James Sandys-Lumsdaine
> *Sent:* 16 May 2022 18:52
> *To:* user@flink.apache.org 
> *Subject:* Checkpoint directories not cleared as TaskManagers run
>
>
> Hello,
>
>
> I'm seeing my Flink deployment's checkpoint storage directories build up
> and never clear down.
>
>
> When I run from my own IDE, I see the only the *latest *"chk-x" directory
> under the job id folder. So the first checkpoint is "chk-1", which is then
> replaced with "chk-2" etc.
>
>
> However, when I run as a proper application mode deployment, each of the 4
> taskmanagers running in their own containers retain every one of the
> "chk-x" directories meaning they eat a lot of disk space after as time
> progresses. Interestingly, the jobmanager itself is fine.
>
>
> Does anyone have any suggestion on how to debug this? Anything obvious
> that would cause such behaviour? I'm currently using Flink 1.14.0.
>
>
> My set up is essentially below (trimmed for simplicity):
>
>Configuration conf = new Configuration();
>
> conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
>
>
> conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
> true);
>
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>
>
>
> env.enableCheckpointing(5 * 1000);
>
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10 * 1000);
>
>
>
> env.setStateBackend(new HashMapStateBackend());
>
>
> env.getCheckpointConfig().setCheckpointStorage("file:/tmp/Flink/State");
>
>
> Thanks in advance,
>
> James.
>
>


Re: Checkpoint directories not cleared as TaskManagers run

2022-05-17 Thread James Sandys-Lumsdaine
Some further Googling says on a StackOverflow posting it is the jobmanager that 
does the deletion and not the taskmanagers.

Currently my taskmanagers are writing their checkpoints to their own private 
disks (/tmp) rather than a share - so my suspicion is the jobmanager can't 
access the folder on other machine. I thought the jobmanagers could clear up 
their own state when instructed to by the jobmanager.

I can not yet use an nfs mount in my deployment so I may have to switch to heap 
checkpoint state instead of using the file storage checkpoint system. Now I 
understand what's going on a bit better it seems pointless for me to have file 
checkpoints that can't be read by the jobmanager for failover.

If anyone can clarify/correct me I would appreciate.

James.

From: James Sandys-Lumsdaine
Sent: 16 May 2022 18:52
To: user@flink.apache.org 
Subject: Checkpoint directories not cleared as TaskManagers run


Hello,


I'm seeing my Flink deployment's checkpoint storage directories build up and 
never clear down.


When I run from my own IDE, I see the only the latest "chk-x" directory under 
the job id folder. So the first checkpoint is "chk-1", which is then replaced 
with "chk-2" etc.


However, when I run as a proper application mode deployment, each of the 4 
taskmanagers running in their own containers retain every one of the "chk-x" 
directories meaning they eat a lot of disk space after as time progresses. 
Interestingly, the jobmanager itself is fine.


Does anyone have any suggestion on how to debug this? Anything obvious that 
would cause such behaviour? I'm currently using Flink 1.14.0.


My set up is essentially below (trimmed for simplicity):

   Configuration conf = new Configuration();

conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);


conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true);

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);



env.enableCheckpointing(5 * 1000);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10 * 1000);



env.setStateBackend(new HashMapStateBackend());

env.getCheckpointConfig().setCheckpointStorage("file:/tmp/Flink/State");


Thanks in advance,

James.



Re: Checkpoint Timeout Troubleshooting

2022-05-05 Thread Sam Ch
Thank you for the help. To follow up, the issue went away when we reverted
back to flink 1.13. May be related to flink-27481. Before reverting, we
tested unaligned checkpoints with a timeout of 10 minutes, which timed out.
Thanks.

On Thu, Apr 28, 2022, 5:38 PM Guowei Ma  wrote:

> Hi Sam
>
> I think the first step is to see which part of your Flink APP is blocking
> the completion of Checkpoint. Specifically, you can refer to the
> "Checkpoint Details" section of the document [1]. Using these methods, you
> should be able to observe where the checkpoint is blocked, for example, it
> may be an agg operator of the app, or it may be blocked on the sink of
> kafka.
> Once you know which operator is blocking, you can use FlameGraph [2] to
> see where the bottleneck of the operator is. Then do specific operations.
>
> Hope these help!
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/monitoring/checkpoint_monitoring/#checkpoint-details
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/flame_graphs/
>
> Best,
> Guowei
>
>
> On Fri, Apr 29, 2022 at 2:10 AM Sam Ch  wrote:
>
>> Hello,
>>
>> I am running into checkpoint timeouts and am looking for guidance on
>> troubleshooting. What should I be looking at? What configuration parameters
>> would affect this? I am afraid I am a Flink newbie so I am still picking up
>> the concepts. Additional notes are below, anything else I can provide?
>> Thanks.
>>
>>
>> The checkpoint size is small (less than 100kB)
>> Multiple flink apps are running on a cluster, only one is running into
>> checkpoint timeouts
>> Timeout is set to 10 mins
>> Tried aligned and unaligned checkpoints
>> Tried clearing checkpoints to start fresh
>> Plenty of disk space
>> Dataflow: kafka source -> flink app -> kafka sink
>>
>


Re: Checkpoint Timeout Troubleshooting

2022-04-28 Thread Guowei Ma
Hi Sam

I think the first step is to see which part of your Flink APP is blocking
the completion of Checkpoint. Specifically, you can refer to the
"Checkpoint Details" section of the document [1]. Using these methods, you
should be able to observe where the checkpoint is blocked, for example, it
may be an agg operator of the app, or it may be blocked on the sink of
kafka.
Once you know which operator is blocking, you can use FlameGraph [2] to see
where the bottleneck of the operator is. Then do specific operations.

Hope these help!
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/monitoring/checkpoint_monitoring/#checkpoint-details
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/flame_graphs/

Best,
Guowei


On Fri, Apr 29, 2022 at 2:10 AM Sam Ch  wrote:

> Hello,
>
> I am running into checkpoint timeouts and am looking for guidance on
> troubleshooting. What should I be looking at? What configuration parameters
> would affect this? I am afraid I am a Flink newbie so I am still picking up
> the concepts. Additional notes are below, anything else I can provide?
> Thanks.
>
>
> The checkpoint size is small (less than 100kB)
> Multiple flink apps are running on a cluster, only one is running into
> checkpoint timeouts
> Timeout is set to 10 mins
> Tried aligned and unaligned checkpoints
> Tried clearing checkpoints to start fresh
> Plenty of disk space
> Dataflow: kafka source -> flink app -> kafka sink
>


Re: Checkpoint failures without exceptions

2021-10-30 Thread Arvid Heise
Hi Patrick,

do you even have so much backpressure that unaligned checkpoints are
necessary? You seem to have only one network exchange where unaligned
checkpoint helps.
The Flink 1.11 implementation of unaligned checkpoint was still
experimental and it might cause unexpected side-effects. Afaik, we fully
recommend unaligned checkpoint only from Flink 1.13 onward in production
settings.
You may also want to reduce your network buffers with aligned checkpoints
to get more reliable checkpointing times under backpressure [1].

TL;DR I would turn off unaligned checkpoints and see what happens. If you
see that unaligned checkpoints are necessary, I'd upgrade Flink.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#memory-configuration

On Wed, Oct 27, 2021 at 4:24 PM Yun Gao  wrote:

> Hi Patrick,
>
> Could you also have a look at the stack of the tasks of the
> second function to see what the main thread and netty
> thread is doing during the checkpoint period ?
>
> Best,
> Yun
>
>
> --Original Mail --
> *Sender:* 
> *Send Date:*Wed Oct 27 22:05:40 2021
> *Recipients:*Flink ML 
> *Subject:*Checkpoint failures without exceptions
>
>> Hi Flink Community,
>>
>>
>>
>> I have an issue with failing checkpoints on all stateful jobs in a
>> session cluster which I’m unable to track down so far. The jobs sit between
>> Kafka.
>>
>>
>>
>> Only the first checkpoint gets completed all others fail.
>>
>> The watermarks are progressing regularly and are aligned between sub
>> tasks.
>>
>> In the Flink Web UI the backpressure is showing as OK.
>>
>> The target topic gets all records outputted as expected. No exceptions
>> occurred in the jobs.
>>
>>
>>
>> The metrics for inPoolUsage and outPoolUsage show 0 but thenumRecordsOut
>> of theWindow Processor (written with the KeyedProcessFunction) shows the
>> expecting incrementing number. The Checkpoint alignment time shows 0ms.
>>
>>
>>
>> I’m using Flink 1.11.1 and enabled unaligned checkpoints. The state and
>> the checkpoints are currently just stored in memory of the job manager node
>> – no state backend is configured. The jobmanager and task manager have
>> enough memory.
>>
>>
>>
>> The job graph has 2 process functions:
>>
>>1. Filter and enrich events
>>2. Keyby/WindowProcessor to KafkaSink
>>
>>
>>
>> The checkpoints for the first process function always gets completed. For
>> the second the checkpoints show up as 0 % acknowledged which fail the whole
>> checkpoint.
>>
>>
>>
>> Finally the checkpoint failures only happens in an environment with
>> higher load – the same jobs run fine in another env with lower load.
>>
>>
>>
>> The window duration is set to 24 hours and the checkpoints are set as
>> follows:
>>
>>
>> checkpoint-interval = 5 minutes
>>
>> min-pause-between-checkpoints = 1 minute
>>
>> checkpoint-timeout = 10 minutes
>>
>>
>>
>> The kafka source is configured with forBoundedOutOfOrderness and idleness
>> parameters.
>>
>>
>>
>> I’m wondering what am I missing here.
>>
>>
>>
>> Thanks!
>>
>>
>>
>>
>>
>


Re: Checkpoint failures without exceptions

2021-10-27 Thread Yun Gao
Hi Patrick,

Could you also have a look at the stack of the tasks of the
second function to see what the main thread and netty
thread is doing during the checkpoint period ? 

Best,
Yun



 --Original Mail --
Sender: 
Send Date:Wed Oct 27 22:05:40 2021
Recipients:Flink ML 
Subject:Checkpoint failures without exceptions

Hi Flink Community,
I have an issue with failing checkpoints on all stateful jobs in a session 
cluster which I’m unable to track down so far. The jobs sit between Kafka.

Only the first checkpoint gets completed all others fail.
The watermarks are progressing regularly and are aligned between sub tasks.
In the Flink Web UI the backpressure is showing as OK.
The target topic gets all records outputted as expected. No exceptions occurred 
in the jobs.

The metrics for inPoolUsage and outPoolUsage show 0 but thenumRecordsOut of 
theWindow Processor (written with the KeyedProcessFunction) shows the expecting 
incrementing number. The Checkpoint alignment time shows 0ms.

I’m using Flink 1.11.1 and enabled unaligned checkpoints. The state and the 
checkpoints are currently just stored in memory of the job manager node – no 
state backend is configured. The jobmanager and task manager have enough memory.

The job graph has 2 process functions:
Filter and enrich events
Keyby/WindowProcessor to KafkaSink

The checkpoints for the first process function always gets completed. For the 
second the checkpoints show up as 0 % acknowledged which fail the whole 
checkpoint.

Finally the checkpoint failures only happens in an environment with higher load 
– the same jobs run fine in another env with lower load.

The window duration is set to 24 hours and the checkpoints are set as follows:

checkpoint-interval = 5 minutes
min-pause-between-checkpoints = 1 minute
checkpoint-timeout = 10 minutes

The kafka source is configured with forBoundedOutOfOrderness and idleness 
parameters.

I’m wondering what am I missing here.

Thanks!

 

Re: Checkpoint size increasing even i enable increasemental checkpoint

2021-10-12 Thread Vijay Bhaskar
Since state size is small, you can try FileState Backend, rather than
RocksDB. You can check once. Thumb rule is if FileStateBackend Performs
worse, RocksDB is good.

Regards
Bhasakar

On Tue, Oct 12, 2021 at 1:47 PM Yun Tang  wrote:

> Hi Lei,
>
> RocksDB state-backend's checkpoint is composited by RocksDB's own files
> (unmodified compressed SST format files) and incremental checkpoints means
> Flink does not upload files which were uploaded before. As you can see,
> incremental checkpoints highly depend on the RocksDB's own mechanism to
> remove useless files, which is triggered by internal compaction. You should
> not care too much on the checkpointed data size as your job consuming more
> and more records, moreover the increasing size is actually quite small
> (from 1.32GB to 1.34GB).
>
> Best
> Yun Tang
>
>
>
> --
> *From:* Lei Wang 
> *Sent:* Monday, October 11, 2021 16:16
> *To:* user 
> *Subject:* Checkpoint size increasing even i enable increasemental
> checkpoint
>
>
> [image: image.png]
>
> The  checkpointed data size became bigger and bigger and the node cpu is
> very high when the job is doing checkpointing.
>  But I have enabled incremental checkpointing:  env.setStateBackend(new 
> RocksDBStateBackend(checkpointDir,
> true));
>
> I am using flink-1.11.2 and aliyun oss as checkpoint storage.
>
>
> Any insight on this?
>
> Thanks,
>
> Lei
>
>
>


Re: Checkpoint size increasing even i enable increasemental checkpoint

2021-10-12 Thread Yun Tang
Hi Lei,

RocksDB state-backend's checkpoint is composited by RocksDB's own files 
(unmodified compressed SST format files) and incremental checkpoints means 
Flink does not upload files which were uploaded before. As you can see, 
incremental checkpoints highly depend on the RocksDB's own mechanism to remove 
useless files, which is triggered by internal compaction. You should not care 
too much on the checkpointed data size as your job consuming more and more 
records, moreover the increasing size is actually quite small (from 1.32GB to 
1.34GB).

Best
Yun Tang




From: Lei Wang 
Sent: Monday, October 11, 2021 16:16
To: user 
Subject: Checkpoint size increasing even i enable increasemental checkpoint


[image.png]

The  checkpointed data size became bigger and bigger and the node cpu is very 
high when the job is doing checkpointing.
 But I have enabled incremental checkpointing:  env.setStateBackend(new 
RocksDBStateBackend(checkpointDir, true));

I am using flink-1.11.2 and aliyun oss as checkpoint storage.


Any insight on this?

Thanks,

Lei



Re: Checkpoint size increasing even i enable increasemental checkpoint

2021-10-11 Thread Caizhi Weng
Hi!

Checkpoint sizes are highly related to your job. Incremental checkpointing
will help only when the values in the state are converging (for example a
distinct count aggregation).

If possible, could you provide your user code or explain what jobs are you
running?

Lei Wang  于2021年10月11日周一 下午4:16写道:

>
> [image: image.png]
>
> The  checkpointed data size became bigger and bigger and the node cpu is
> very high when the job is doing checkpointing.
>  But I have enabled incremental checkpointing:  env.setStateBackend(new 
> RocksDBStateBackend(checkpointDir,
> true));
>
> I am using flink-1.11.2 and aliyun oss as checkpoint storage.
>
>
> Any insight on this?
>
> Thanks,
>
> Lei
>
>
>


Re: Checkpoint is timing out - inspecting state

2021-06-16 Thread Dan Hill
Hi Yun.  The UI was not useful for this case.  I had a feeling before hand
about what the issue was.  We refactored the state and now the checkpoint
is 10x faster.

On Mon, Jun 14, 2021 at 5:47 AM Yun Gao  wrote:

> Hi Dan,
>
> Flink should already have integrate a tool in the web UI to monitor
> the detailed statistics of the checkpoint [1]. It would show the time
> consumed in each part and each task, thus it could be used to debug
> the checkpoint timeout.
>
> Best,
> Yun
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/monitoring/checkpoint_monitoring/
>
> --Original Mail --
> *Sender:*Dan Hill 
> *Send Date:*Sat Jun 12 09:15:50 2021
> *Recipients:*user 
> *Subject:*Checkpoint is timing out - inspecting state
>
>> Hi.
>>
>> We're doing something bad with our Flink state.  We just launched a
>> feature that creates very big values (lists of objects that we append to)
>> in MapState.
>>
>> Our checkpoints time out (10 minutes).  I'm assuming the values are too
>> big.  Backpressure is okay and cpu+memory metrics look okay.
>>
>> Questions
>>
>> 1. Is there an easy tool for inspecting the Flink state?
>>
>> I found this post about drilling into Flink state
>> .
>> I was hoping for something more like a CLI.
>>
>> 2. Is there a way to break down the time spent during a checkout if it
>> times out?
>>
>> Thanks!
>> - Dan
>>
>>
>>


Re: Checkpoint loading failure

2021-06-16 Thread Guowei Ma
Hi Padarn
Will there be these errors if the jobgraph is not modified?
In addition, is this error stack all? Is it possible that other errors
caused the stream to be closed?
Best,
Guowei


On Tue, Jun 15, 2021 at 9:54 PM Padarn Wilson  wrote:

> Hi all,
>
> We have a job that has a medium size state (around 4GB) and after adding a
> new part of the job graph (which should not impact the job too much) we
> found that every single checkpoint restore has the following error:
>
> Caused by: java.io.IOException: s3a://: Stream is closed!
>> at
>> org.apache.hadoop.fs.s3a.S3AInputStream.checkNotClosed(S3AInputStream.java:472)
>> at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:347)
>> at java.io.FilterInputStream.read(FilterInputStream.java:83)
>> at
>> org.apache.flink.fs.s3hadoop.common.HadoopDataInputStream.read(HadoopDataInputStream.java:86)
>> at
>> org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:50)
>> at
>> org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:42)
>> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
>> at org.apache.flink.types.StringValue.readString(StringValue.java:781)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)
>> at
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:126)
>> at
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
>> at
>> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:161)
>> at
>> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:43)
>> at
>> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:289)
>> at
>> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:323)
>> at
>> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:285)
>> at
>> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:172)
>> at
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:112)
>> ... 17 more
>
>
> I haven't really got any clues on what this is caused by. You notice we
> are using the Hadoop file system, but switching to Presto is a bit tricky
> for us because of some of the bucket permissions that would need to change.
>
> Anyone have tips on debugging (or solving this)?
>


Re: Checkpoint is timing out - inspecting state

2021-06-14 Thread Yun Gao
Hi Dan,

Flink should already have integrate a tool in the web UI to monitor 
the detailed statistics of the checkpoint [1]. It would show the time
consumed in each part and each task, thus it could be used to debug
the checkpoint timeout.

Best,
Yun



[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/monitoring/checkpoint_monitoring/


 --Original Mail --
Sender:Dan Hill 
Send Date:Sat Jun 12 09:15:50 2021
Recipients:user 
Subject:Checkpoint is timing out - inspecting state

Hi.

We're doing something bad with our Flink state.  We just launched a feature 
that creates very big values (lists of objects that we append to) in MapState.

Our checkpoints time out (10 minutes).  I'm assuming the values are too big.  
Backpressure is okay and cpu+memory metrics look okay.

Questions

1. Is there an easy tool for inspecting the Flink state?

I found this post about drilling into Flink state.  I was hoping for something 
more like a CLI.

2. Is there a way to break down the time spent during a checkout if it times 
out?

Thanks!
- Dan




Re: Checkpoint error - "The job has failed"

2021-04-28 Thread Dan Hill
Oh interesting.  Yea, could be.  We'll soon update to v1.12.  Thanks Robert
and Yun!

On Wed, Apr 28, 2021 at 1:30 AM Yun Tang  wrote:

> Hi Dan,
>
> You could refer to the "Fix Versions" in FLINK-16753 [1] and know that
> this bug is resolved after 1.11.3 not 1.11.1.
>
> [1] https://issues.apache.org/jira/browse/FLINK-16753
>
> Best
> Yun Tang
> --
> *From:* Dan Hill 
> *Sent:* Tuesday, April 27, 2021 7:50
> *To:* Yun Tang 
> *Cc:* Robert Metzger ; user 
> *Subject:* Re: Checkpoint error - "The job has failed"
>
> Hey Yun and Robert,
>
> I'm using Flink v1.11.1.
>
> Robert, I'll send you a separate email with the logs.
>
> On Mon, Apr 26, 2021 at 12:46 AM Yun Tang  wrote:
>
> Hi Dan,
>
> I think you might use older version of Flink and this problem has been
> resolved by FLINK-16753 [1] after Flink-1.10.3.
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-16753
>
> Best
> Yun Tang
> ----------
> *From:* Robert Metzger 
> *Sent:* Monday, April 26, 2021 14:46
> *To:* Dan Hill 
> *Cc:* user 
> *Subject:* Re: Checkpoint error - "The job has failed"
>
> Hi Dan,
>
> can you provide me with the JobManager logs to take a look as well? (This
> will also tell me which Flink version you are using)
>
>
>
> On Mon, Apr 26, 2021 at 7:20 AM Dan Hill  wrote:
>
> My Flink job failed to checkpoint with a "The job has failed" error.  The
> logs contained no other recent errors.  I keep hitting the error even if I
> cancel the jobs and restart them.  When I restarted my jobmanager and
> taskmanager, the error went away.
>
> What error am I hitting?  It looks like there is bad state that lives
> outside the scope of a job.
>
> How often do people restart their jobmanagers and taskmanager to deal with
> errors like this?
>
>


Re: Checkpoint error - "The job has failed"

2021-04-28 Thread Yun Tang
Hi Dan,

You could refer to the "Fix Versions" in FLINK-16753 [1] and know that this bug 
is resolved after 1.11.3 not 1.11.1.

[1] https://issues.apache.org/jira/browse/FLINK-16753

Best
Yun Tang

From: Dan Hill 
Sent: Tuesday, April 27, 2021 7:50
To: Yun Tang 
Cc: Robert Metzger ; user 
Subject: Re: Checkpoint error - "The job has failed"

Hey Yun and Robert,

I'm using Flink v1.11.1.

Robert, I'll send you a separate email with the logs.

On Mon, Apr 26, 2021 at 12:46 AM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Dan,

I think you might use older version of Flink and this problem has been resolved 
by FLINK-16753 [1] after Flink-1.10.3.


[1] https://issues.apache.org/jira/browse/FLINK-16753

Best
Yun Tang

From: Robert Metzger mailto:rmetz...@apache.org>>
Sent: Monday, April 26, 2021 14:46
To: Dan Hill mailto:quietgol...@gmail.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Checkpoint error - "The job has failed"

Hi Dan,

can you provide me with the JobManager logs to take a look as well? (This will 
also tell me which Flink version you are using)



On Mon, Apr 26, 2021 at 7:20 AM Dan Hill 
mailto:quietgol...@gmail.com>> wrote:
My Flink job failed to checkpoint with a "The job has failed" error.  The logs 
contained no other recent errors.  I keep hitting the error even if I cancel 
the jobs and restart them.  When I restarted my jobmanager and taskmanager, the 
error went away.

What error am I hitting?  It looks like there is bad state that lives outside 
the scope of a job.

How often do people restart their jobmanagers and taskmanager to deal with 
errors like this?


Re: Checkpoint error - "The job has failed"

2021-04-26 Thread Dan Hill
Hey Yun and Robert,

I'm using Flink v1.11.1.

Robert, I'll send you a separate email with the logs.

On Mon, Apr 26, 2021 at 12:46 AM Yun Tang  wrote:

> Hi Dan,
>
> I think you might use older version of Flink and this problem has been
> resolved by FLINK-16753 [1] after Flink-1.10.3.
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-16753
>
> Best
> Yun Tang
> --
> *From:* Robert Metzger 
> *Sent:* Monday, April 26, 2021 14:46
> *To:* Dan Hill 
> *Cc:* user 
> *Subject:* Re: Checkpoint error - "The job has failed"
>
> Hi Dan,
>
> can you provide me with the JobManager logs to take a look as well? (This
> will also tell me which Flink version you are using)
>
>
>
> On Mon, Apr 26, 2021 at 7:20 AM Dan Hill  wrote:
>
> My Flink job failed to checkpoint with a "The job has failed" error.  The
> logs contained no other recent errors.  I keep hitting the error even if I
> cancel the jobs and restart them.  When I restarted my jobmanager and
> taskmanager, the error went away.
>
> What error am I hitting?  It looks like there is bad state that lives
> outside the scope of a job.
>
> How often do people restart their jobmanagers and taskmanager to deal with
> errors like this?
>
>


Re: Checkpoint error - "The job has failed"

2021-04-26 Thread Yun Tang
Hi Dan,

I think you might use older version of Flink and this problem has been resolved 
by FLINK-16753 [1] after Flink-1.10.3.


[1] https://issues.apache.org/jira/browse/FLINK-16753

Best
Yun Tang

From: Robert Metzger 
Sent: Monday, April 26, 2021 14:46
To: Dan Hill 
Cc: user 
Subject: Re: Checkpoint error - "The job has failed"

Hi Dan,

can you provide me with the JobManager logs to take a look as well? (This will 
also tell me which Flink version you are using)



On Mon, Apr 26, 2021 at 7:20 AM Dan Hill 
mailto:quietgol...@gmail.com>> wrote:
My Flink job failed to checkpoint with a "The job has failed" error.  The logs 
contained no other recent errors.  I keep hitting the error even if I cancel 
the jobs and restart them.  When I restarted my jobmanager and taskmanager, the 
error went away.

What error am I hitting?  It looks like there is bad state that lives outside 
the scope of a job.

How often do people restart their jobmanagers and taskmanager to deal with 
errors like this?


Re: Checkpoint error - "The job has failed"

2021-04-25 Thread Robert Metzger
Hi Dan,

can you provide me with the JobManager logs to take a look as well? (This
will also tell me which Flink version you are using)



On Mon, Apr 26, 2021 at 7:20 AM Dan Hill  wrote:

> My Flink job failed to checkpoint with a "The job has failed" error.  The
> logs contained no other recent errors.  I keep hitting the error even if I
> cancel the jobs and restart them.  When I restarted my jobmanager and
> taskmanager, the error went away.
>
> What error am I hitting?  It looks like there is bad state that lives
> outside the scope of a job.
>
> How often do people restart their jobmanagers and taskmanager to deal with
> errors like this?
>


Re: Checkpoint timeouts at times of high load

2021-04-06 Thread Robert Metzger
It could very well be that your job gets stuck in a restart loop for some
reason. Can you either post the full TaskManager logs here, or try to
figure out yourself why the first checkpoint that timed out, timed out?
Backpressure or blocked operators are a common cause for this. In your
case, it could very well be that the Kafka producer is not confirming the
checkpoint due to the Kafka transactions. If backpressure is causing this,
consider enabling unaligned checkpoints. It could also be. the case that
the transactions of Kafka are too slow, causing backpressure and checkpoint
timeouts?!



On Mon, Apr 5, 2021 at 9:57 AM Geldenhuys, Morgan Karl <
morgan.geldenh...@tu-berlin.de> wrote:

> Thank you for the information. I have a feeling this is more to do with
> EXACTLY_ONCE kafka producers and transactions not playing nice with
> checkpoints and a timeout happens. The jobs seem to fail and hit this
> restart and fail loop. Looking in the logs, taskmanager logs grow very
> large with the same messages repeating over and over again. Ive attacked a
> file for this. The two lines that give me pause are:
>
>
>
> *Closing the Kafka producer with timeoutMillis = 0 ms. *
>
> *Proceeding to force close the producer since pending requests could not
> be completed within timeout 0 ms.*
>
>
> I'm not really sure which timeout this is but it looks like there is a
> timeout loop happening here.
>
>
> The Kafka producer has been configured as such (the transaction timeout
> has been set on the kafka server to match the producer):
>
>
> Properties kafkaProducerProps = new Properties();
> kafkaProducerProps.setProperty("bootstrap.servers", brokerList);
> kafkaProducerProps.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
> "360");
> kafkaProducerProps.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
>  "5");
> kafkaProducerProps.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
> UUID.randomUUID().toString());
> kafkaProducerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
> "true");
> kafkaProducerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "500");
> kafkaProducerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
> kafkaProducerProps.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, 
> "33554432");
> kafkaProducerProps.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
> "3");
> kafkaProducerProps.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
> "12");
>
> FlinkKafkaProducer myProducer =
> new FlinkKafkaProducer<>(
> producerTopic,
> (KafkaSerializationSchema) (value, aLong) -> {
> return new ProducerRecord<>(producerTopic, value.getBytes());
> },
> kafkaProducerProps,
> Semantic.EXACTLY_ONCE,
> 10);
>
>
> And checkpoints have been configured as such:
>
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // configuring RocksDB state backend to use HDFS
> String backupFolder = props.getProperty("hdfs.backupFolder");
> StateBackend backend = new RocksDBStateBackend(backupFolder, true);
> env.setStateBackend(backend);
> // start a checkpoint based on supplied interval
> env.enableCheckpointing(checkpointInterval);
> // set mode to exactly-once (this is the default)
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> // make sure progress happen between checkpoints
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(checkpointInterval);
> // checkpoints have to complete within two minute, or are discarded
> env.getCheckpointConfig().setCheckpointTimeout(38);
> //env.getCheckpointConfig().setTolerableCheckpointFailureNumber();
> // no external services which could take some time to respond, therefore 1
> // allow only one checkpoint to be in progress at the same time
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> // enable externalized checkpoints which are deleted after job cancellation
> env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
>
>
> Additionally, each taskmanager has been configured with 4GB of memory,
> there is a sliding window of 10 seconds with a slide of 1 second, and the
> cluster setup is using flink native.
>
>
> Any hints would be much appreciated!
>
>
> Regards,
>
> M.
>
>
> --
> *From:* Guowei Ma 
> *Sent:* 01 April 2021 14:19
> *To:* Geldenhuys, Morgan Karl
> *Cc:* user
> *Subject:* Re: Checkpoint timeouts at times of high load
>
> Hi,
> I think there 

Re: Checkpoint timeouts at times of high load

2021-04-05 Thread Geldenhuys, Morgan Karl
Thank you for the information. I have a feeling this is more to do with 
EXACTLY_ONCE kafka producers and transactions not playing nice with checkpoints 
and a timeout happens. The jobs seem to fail and hit this restart and fail 
loop. Looking in the logs, taskmanager logs grow very large with the same 
messages repeating over and over again. Ive attacked a file for this. The two 
lines that give me pause are:


Closing the Kafka producer with timeoutMillis = 0 ms.

Proceeding to force close the producer since pending requests could not be 
completed within timeout 0 ms.


I'm not really sure which timeout this is but it looks like there is a timeout 
loop happening here.


The Kafka producer has been configured as such (the transaction timeout has 
been set on the kafka server to match the producer):


Properties kafkaProducerProps = new Properties();
kafkaProducerProps.setProperty("bootstrap.servers", brokerList);
kafkaProducerProps.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
"360");
kafkaProducerProps.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
 "5");
kafkaProducerProps.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
UUID.randomUUID().toString());
kafkaProducerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
"true");
kafkaProducerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "500");
kafkaProducerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
kafkaProducerProps.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
kafkaProducerProps.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
"3");
kafkaProducerProps.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
"12");

FlinkKafkaProducer myProducer =
new FlinkKafkaProducer<>(
producerTopic,
(KafkaSerializationSchema) (value, aLong) -> {
return new ProducerRecord<>(producerTopic, value.getBytes());
},
kafkaProducerProps,
Semantic.EXACTLY_ONCE,
10);


And checkpoints have been configured as such:


StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
// configuring RocksDB state backend to use HDFS
String backupFolder = props.getProperty("hdfs.backupFolder");
StateBackend backend = new RocksDBStateBackend(backupFolder, true);
env.setStateBackend(backend);
// start a checkpoint based on supplied interval
env.enableCheckpointing(checkpointInterval);
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// make sure progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(checkpointInterval);
// checkpoints have to complete within two minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(38);
//env.getCheckpointConfig().setTolerableCheckpointFailureNumber();
// no external services which could take some time to respond, therefore 1
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are deleted after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);


Additionally, each taskmanager has been configured with 4GB of memory, there is 
a sliding window of 10 seconds with a slide of 1 second, and the cluster setup 
is using flink native.


Any hints would be much appreciated!


Regards,

M.


________
From: Guowei Ma 
Sent: 01 April 2021 14:19
To: Geldenhuys, Morgan Karl
Cc: user
Subject: Re: Checkpoint timeouts at times of high load

Hi,
I think there are many reasons that could lead to the checkpoint timeout.
Would you like to share some detailed information of checkpoint? For example, 
the detailed checkpoint information from the web.[1]  And which Flink version 
do you use?

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/checkpoint_monitoring.html

Best,
Guowei


On Thu, Apr 1, 2021 at 4:33 PM Geldenhuys, Morgan Karl 
mailto:morgan.geldenh...@tu-berlin.de>> wrote:

Hi Community,


I have a number of flink jobs running inside my session cluster with varying 
checkpoint intervals plus a large amount of operator state and in times of high 
load, the jobs fail due to checkpoint timeouts (set to 6 minutes). I can only 
assume this is because the latencies for saving checkpoints at these times of 
high load increase. I have a 30 node HDFS cluster for checkpoints... however I 
see that only 4 of these nodes are being used for storage. Is there a way of 
ensuring the load is evenly spread? Could there be another reason for these 
checkpoint timeouts? Events are being consumed from kafka, to kafka with 
EXACTLY ONCE guarantees enabled.


Thank you very much!


M.
2021-04-05 07

Re: Checkpoint timeouts at times of high load

2021-04-01 Thread Guowei Ma
Hi,
I think there are many reasons that could lead to the checkpoint timeout.
Would you like to share some detailed information of checkpoint? For
example, the detailed checkpoint information from the web.[1]  And which
Flink version do you use?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/checkpoint_monitoring.html

Best,
Guowei


On Thu, Apr 1, 2021 at 4:33 PM Geldenhuys, Morgan Karl <
morgan.geldenh...@tu-berlin.de> wrote:

> Hi Community,
>
>
> I have a number of flink jobs running inside my session cluster with
> varying checkpoint intervals plus a large amount of operator state and in
> times of high load, the jobs fail due to checkpoint timeouts (set to 6
> minutes). I can only assume this is because the latencies for saving
> checkpoints at these times of high load increase. I have a 30 node HDFS
> cluster for checkpoints... however I see that only 4 of these nodes are
> being used for storage. Is there a way of ensuring the load is evenly
> spread? Could there be another reason for these checkpoint timeouts? Events
> are being consumed from kafka, to kafka with EXACTLY ONCE guarantees
> enabled.
>
>
> Thank you very much!
>
>
> M.
>


Re: Checkpoint fail due to timeout

2021-03-30 Thread Alexey Trenikhun
Hi Piotrek,

I can't reproduce problem anymore, before the problem happened 2-3 times in 
row, I've turned off unaligned checkpoints, now returned unaligned checkpoints 
back, but the problem seems gone for now. When problem happened there was no 
progress on source operators, I thought maybe it was by design, that after 
acknowledgment source doesn't produce anything till checkpoint complete... I 
also have union of kafka sources (~50 partitions each), so maybe it same as [1]

[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Source-Operators-Stuck-in-the-requestBufferBuilderBlocking-td42530.html
Apache Flink User Mailing List archive. - Source Operators Stuck in the 
requestBufferBuilderBlocking<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Source-Operators-Stuck-in-the-requestBufferBuilderBlocking-td42530.html>
Source Operators Stuck in the requestBufferBuilderBlocking. Hi, I keep seeing 
the following situation where a task is blocked getting a MemorySegment from 
the pool but the operator is still...
apache-flink-user-mailing-list-archive.2336050.n4.nabble.com


From: Piotr Nowojski 
Sent: Tuesday, March 23, 2021 5:31 AM
To: Alexey Trenikhun 
Cc: Arvid Heise ; ChangZhuo Chen (陳昌倬) ; 
ro...@apache.org ; Flink User Mail List 

Subject: Re: Checkpoint fail due to timeout

Hi Alexey,

You should definitely investigate why the job is stuck.
1. First of all, is it completely stuck, or is something moving? - Use Flink 
metrics [1] (number bytes/records processed), and go through all of the 
operators/tasks to check this.
2. The stack traces like the one you quoted:
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
you can most likely ignore. Such Task ("Legacy Source Thread - Source: 
digital-itx-eastus2 -> Filter (6/6)#0") is backpressured and the problem lies 
downstream.
3. To check what tasks are backpressured, you can also use Flink metrics - 
check "isBackPressured" metric. Again, back pressured tasks are most likely not 
the source of the problem. Check downstream from the back pressured task.
4. First (the most upstream) not backpressured task, which is 
accepting/processing data from some backpressured tasks is the interesting one. 
It's causing backpressure and you need to investigate what is the problem. Take 
a look at it's stack traces, maybe attach a remote profiler and profile it's 
code (if it's making slow progress). Maybe it's stuck in your user code doing 
something.

Please let us know what you have found out.

Piotrek

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html

pon., 22 mar 2021 o 19:18 Alexey Trenikhun 
mailto:yen...@msn.com>> napisał(a):
Great! I doubt that it will help in my case however, since in my case even 
unaligned checkpoints “stuck”, in difference with aligned checkpoints, after 
unaligned checkpoint triggered, Flink at some moment become idle, kubernetes 
metrics report very little CPU usage by container, but unaligned checkpoint 
still times out after 3hr.


From: Arvid Heise mailto:ar...@apache.org>>
Sent: Monday, March 22, 2021 6:58:20 AM
To: ChangZhuo Chen (陳昌倬) mailto:czc...@czchen.org>>
Cc: Alexey Trenikhun mailto:yen...@msn.com>>; 
ro...@apache.org<mailto:ro...@apache.org> 
mailto:ro...@apache.org>>; Flink User Mail List 
mailto:user@flink.apache.org>>
Subject: Re: Checkpoint fail due to timeout

Hi Alexey,

rescaling from unaligned checkpoints will be supported with the upcoming 1.13 
release (expected at the end of April).

Best,

Arvid

On Wed, Mar 17, 2021 at 8:29 AM ChangZhuo Chen (陳昌倬) 
mailto:czc...@czchen.org>> wrote:
On Wed, Mar 17, 2021 at 05:45:38AM +, Alexey Trenikhun wrote:
> In my opinion looks similar. Were you able to tune-up Flink to make it work? 
> I'm stuck with it, I wanted to scale up hoping to reduce backpressure, but to 
> rescale I need to take savepoint, which never completes (at least takes 
> longer than 3 hours).

You can use aligned checkpoint to scala your job. Just restarting from
checkpoint with the same jar file, and new parallelism shall do the
trick.


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


Re: Checkpoint fail due to timeout

2021-03-30 Thread Alexey Trenikhun
I also expected improve of checkpointing at the cost of throughput, but in in 
reality I didn't notice difference neither in checkpointing or throughput. 
Backlog was purged by Kafka, so can't post thread dump right now, but I doubt 
that the problem is gone, so will have next chance during next performance run.

Thanks,
Alexey


From: Roman Khachatryan 
Sent: Tuesday, March 23, 2021 12:17 AM
To: Alexey Trenikhun 
Cc: ChangZhuo Chen (陳昌倬) ; Flink User Mail List 

Subject: Re: Checkpoint fail due to timeout

Unfortunately, the lock can't be changed as it's part of the public
API (though it will be eliminated with the new source API in FLIP-27).

Theoretically, the change you've made should improve checkpointing at
the cost of throughput. Is it what you see?

But the new stack traces seem strange to me as the emission of the
checkpoint barrier doesn't require a buffer. I also don't see that the
source thread holds the checkpoint lock (something like "locked
<0x2af646cc> (a java.lang.Object)"). Could you post or attach
the full thread dump?



Regards,
Roman

On Tue, Mar 23, 2021 at 6:30 AM Alexey Trenikhun  wrote:
>
> I've changed KafkaFetcher (GKafkaFetcher) to enter/exit synchronized block on 
> each record, it inverted behavior - now Legacy Source thread waits for 
> checkpointLock, while Source requesting memorySegment.
>
> "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199 WAITING on 
> java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
> at sun.misc.Unsafe.park(Native Method)
> -  waiting on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
> ...
>
> "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=203 
> BLOCKED on java.lang.Object@2af646cc owned by "Source: digital-itx-eastus2 -> 
> Filter (6/6)#0" Id=199
> at 
> com.gim.fsp.util.flink.GKafkaFetcher.emitRecordsWithTimestamps(GKafkaFetcher.java:54)
> -  blocked on java.lang.Object@2af646cc
> at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
>
> Thanks,
> Alexey
> 
> From: Roman Khachatryan 
> Sent: Monday, March 22, 2021 1:36 AM
> To: ChangZhuo Chen (陳昌倬) 
> Cc: Alexey Trenikhun ; Flink User Mail List 
> 
> Subject: Re: Checkpoint fail due to timeout
>
> Thanks for sharing the thread dump.
>
> It shows that the source thread is indeed back-pressured
> (checkpoint lock is held by a thread which is trying to emit but
> unable to acquire any free buffers).
>
> The lock is per task, so there can be several locks per TM.
>
> @ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely
> the same issue (but I can't tell for sure without a full thread dump)
>
>
> Regards,
> Roman
>
> On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬)  
> wrote:
> >
> > On Tue, Mar 16, 2021 at 02:32:54AM +, Alexey Trenikhun wrote:
> > > Hi Roman,
> > > I took thread dump:
> > > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on 
> > > java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: 
> > > digital-itx-eastus2 -> Filter (6/6)#0" Id=202
> > > at 
> > > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
&g

Re: Checkpoint fail due to timeout

2021-03-23 Thread Piotr Nowojski
Hi Alexey,

You should definitely investigate why the job is stuck.
1. First of all, is it completely stuck, or is something moving? - Use
Flink metrics [1] (number bytes/records processed), and go through all of
the operators/tasks to check this.
2. The stack traces like the one you quoted:
> at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
you can most likely ignore. Such Task ("Legacy Source Thread - Source:
digital-itx-eastus2 -> Filter (6/6)#0") is backpressured and the problem
lies downstream.
3. To check what tasks are backpressured, you can also use Flink metrics -
check "isBackPressured" metric. Again, back pressured tasks are most
likely not the source of the problem. Check downstream from the back
pressured task.
4. First (the most upstream) not backpressured task, which is
accepting/processing data from some backpressured tasks is the interesting
one. It's causing backpressure and you need to investigate what is the
problem. Take a look at it's stack traces, maybe attach a remote profiler
and profile it's code (if it's making slow progress). Maybe it's stuck in
your user code doing something.

Please let us know what you have found out.

Piotrek

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html

pon., 22 mar 2021 o 19:18 Alexey Trenikhun  napisał(a):

> Great! I doubt that it will help in my case however, since in my case even
> unaligned checkpoints “stuck”, in difference with aligned checkpoints,
> after unaligned checkpoint triggered, Flink at some moment become idle,
> kubernetes metrics report very little CPU usage by container, but unaligned
> checkpoint still times out after 3hr.
>
> --
> *From:* Arvid Heise 
> *Sent:* Monday, March 22, 2021 6:58:20 AM
> *To:* ChangZhuo Chen (陳昌倬) 
> *Cc:* Alexey Trenikhun ; ro...@apache.org <
> ro...@apache.org>; Flink User Mail List 
> *Subject:* Re: Checkpoint fail due to timeout
>
> Hi Alexey,
>
> rescaling from unaligned checkpoints will be supported with the upcoming
> 1.13 release (expected at the end of April).
>
> Best,
>
> Arvid
>
> On Wed, Mar 17, 2021 at 8:29 AM ChangZhuo Chen (陳昌倬) 
> wrote:
>
> On Wed, Mar 17, 2021 at 05:45:38AM +, Alexey Trenikhun wrote:
> > In my opinion looks similar. Were you able to tune-up Flink to make it
> work? I'm stuck with it, I wanted to scale up hoping to reduce
> backpressure, but to rescale I need to take savepoint, which never
> completes (at least takes longer than 3 hours).
>
> You can use aligned checkpoint to scala your job. Just restarting from
> checkpoint with the same jar file, and new parallelism shall do the
> trick.
>
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
>
>


Re: Checkpoint fail due to timeout

2021-03-23 Thread Roman Khachatryan
Unfortunately, the lock can't be changed as it's part of the public
API (though it will be eliminated with the new source API in FLIP-27).

Theoretically, the change you've made should improve checkpointing at
the cost of throughput. Is it what you see?

But the new stack traces seem strange to me as the emission of the
checkpoint barrier doesn't require a buffer. I also don't see that the
source thread holds the checkpoint lock (something like "locked
<0x2af646cc> (a java.lang.Object)"). Could you post or attach
the full thread dump?



Regards,
Roman

On Tue, Mar 23, 2021 at 6:30 AM Alexey Trenikhun  wrote:
>
> I've changed KafkaFetcher (GKafkaFetcher) to enter/exit synchronized block on 
> each record, it inverted behavior - now Legacy Source thread waits for 
> checkpointLock, while Source requesting memorySegment.
>
> "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199 WAITING on 
> java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
> at sun.misc.Unsafe.park(Native Method)
> -  waiting on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
> ...
>
> "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=203 
> BLOCKED on java.lang.Object@2af646cc owned by "Source: digital-itx-eastus2 -> 
> Filter (6/6)#0" Id=199
> at 
> com.gim.fsp.util.flink.GKafkaFetcher.emitRecordsWithTimestamps(GKafkaFetcher.java:54)
> -  blocked on java.lang.Object@2af646cc
> at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
>
> Thanks,
> Alexey
> 
> From: Roman Khachatryan 
> Sent: Monday, March 22, 2021 1:36 AM
> To: ChangZhuo Chen (陳昌倬) 
> Cc: Alexey Trenikhun ; Flink User Mail List 
> 
> Subject: Re: Checkpoint fail due to timeout
>
> Thanks for sharing the thread dump.
>
> It shows that the source thread is indeed back-pressured
> (checkpoint lock is held by a thread which is trying to emit but
> unable to acquire any free buffers).
>
> The lock is per task, so there can be several locks per TM.
>
> @ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely
> the same issue (but I can't tell for sure without a full thread dump)
>
>
> Regards,
> Roman
>
> On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬)  
> wrote:
> >
> > On Tue, Mar 16, 2021 at 02:32:54AM +, Alexey Trenikhun wrote:
> > > Hi Roman,
> > > I took thread dump:
> > > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on 
> > > java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: 
> > > digital-itx-eastus2 -> Filter (6/6)#0" Id=202
> > > at 
> > > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> > > -  blocked on java.lang.Object@5366a0e2
> > > at 
> > > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> > > at 
> > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> > > at 
> > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> > > at 
> > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTa

Re: Checkpoint fail due to timeout

2021-03-22 Thread Alexey Trenikhun
I've changed KafkaFetcher (GKafkaFetcher) to enter/exit synchronized block on 
each record, it inverted behavior - now Legacy Source thread waits for 
checkpointLock, while Source requesting memorySegment.

"Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199 WAITING on 
java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
at sun.misc.Unsafe.park(Native Method)
-  waiting on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
...

"Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=203 
BLOCKED on java.lang.Object@2af646cc owned by "Source: digital-itx-eastus2 -> 
Filter (6/6)#0" Id=199
at 
com.gim.fsp.util.flink.GKafkaFetcher.emitRecordsWithTimestamps(GKafkaFetcher.java:54)
-  blocked on java.lang.Object@2af646cc
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)

Thanks,
Alexey

From: Roman Khachatryan 
Sent: Monday, March 22, 2021 1:36 AM
To: ChangZhuo Chen (陳昌倬) 
Cc: Alexey Trenikhun ; Flink User Mail List 

Subject: Re: Checkpoint fail due to timeout

Thanks for sharing the thread dump.

It shows that the source thread is indeed back-pressured
(checkpoint lock is held by a thread which is trying to emit but
unable to acquire any free buffers).

The lock is per task, so there can be several locks per TM.

@ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely
the same issue (but I can't tell for sure without a full thread dump)


Regards,
Roman

On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬)  wrote:
>
> On Tue, Mar 16, 2021 at 02:32:54AM +, Alexey Trenikhun wrote:
> > Hi Roman,
> > I took thread dump:
> > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on 
> > java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: 
> > digital-itx-eastus2 -> Filter (6/6)#0" Id=202
> > at 
> > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> > -  blocked on java.lang.Object@5366a0e2
> > at 
> > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> > at 
> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> > at 
> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> > at 
> > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> > at 
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> >
> > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" 
> > Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> > at sun.misc.Unsafe.park(Native Method)
> > -  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> > at 
> > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> > at 
> > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> > at 
> > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> > at 
> > java.util.concurrent.CompletableFuture.get(Comp

Re: Checkpoint fail due to timeout

2021-03-22 Thread Alexey Trenikhun
Would it help if checkpoint would be fair lock? It looks strange, downstream 
produces output, so I assume at some moment buffers become available, but lock 
can’t be acquired for 3+hours


From: Roman Khachatryan 
Sent: Monday, March 22, 2021 1:36:35 AM
To: ChangZhuo Chen (陳昌倬) 
Cc: Alexey Trenikhun ; Flink User Mail List 

Subject: Re: Checkpoint fail due to timeout

Thanks for sharing the thread dump.

It shows that the source thread is indeed back-pressured
(checkpoint lock is held by a thread which is trying to emit but
unable to acquire any free buffers).

The lock is per task, so there can be several locks per TM.

@ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely
the same issue (but I can't tell for sure without a full thread dump)


Regards,
Roman

On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬)  wrote:
>
> On Tue, Mar 16, 2021 at 02:32:54AM +, Alexey Trenikhun wrote:
> > Hi Roman,
> > I took thread dump:
> > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on 
> > java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: 
> > digital-itx-eastus2 -> Filter (6/6)#0" Id=202
> > at 
> > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> > -  blocked on java.lang.Object@5366a0e2
> > at 
> > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> > at 
> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> > at 
> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> > at 
> > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> > at 
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> >
> > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" 
> > Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> > at sun.misc.Unsafe.park(Native Method)
> > -  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> > at 
> > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> > at 
> > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> > at 
> > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> > at 
> > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> > at 
> > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> > at 
> > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
> >
> > Is it checkpoint lock? Is checkpoint lock per task or per TM? I see 
> > multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing 
> > blocked on different Objects.
>
> Hi,
>
> This call stack is similar to our case as described in [0]. Maybe they
> are the same issue?
>
> [0] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html
>
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


Re: Checkpoint fail due to timeout

2021-03-22 Thread Alexey Trenikhun
Great! I doubt that it will help in my case however, since in my case even 
unaligned checkpoints “stuck”, in difference with aligned checkpoints, after 
unaligned checkpoint triggered, Flink at some moment become idle, kubernetes 
metrics report very little CPU usage by container, but unaligned checkpoint 
still times out after 3hr.


From: Arvid Heise 
Sent: Monday, March 22, 2021 6:58:20 AM
To: ChangZhuo Chen (陳昌倬) 
Cc: Alexey Trenikhun ; ro...@apache.org ; 
Flink User Mail List 
Subject: Re: Checkpoint fail due to timeout

Hi Alexey,

rescaling from unaligned checkpoints will be supported with the upcoming 1.13 
release (expected at the end of April).

Best,

Arvid

On Wed, Mar 17, 2021 at 8:29 AM ChangZhuo Chen (陳昌倬) 
mailto:czc...@czchen.org>> wrote:
On Wed, Mar 17, 2021 at 05:45:38AM +, Alexey Trenikhun wrote:
> In my opinion looks similar. Were you able to tune-up Flink to make it work? 
> I'm stuck with it, I wanted to scale up hoping to reduce backpressure, but to 
> rescale I need to take savepoint, which never completes (at least takes 
> longer than 3 hours).

You can use aligned checkpoint to scala your job. Just restarting from
checkpoint with the same jar file, and new parallelism shall do the
trick.


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


Re: Checkpoint fail due to timeout

2021-03-22 Thread Arvid Heise
Hi Alexey,

rescaling from unaligned checkpoints will be supported with the upcoming
1.13 release (expected at the end of April).

Best,

Arvid

On Wed, Mar 17, 2021 at 8:29 AM ChangZhuo Chen (陳昌倬) 
wrote:

> On Wed, Mar 17, 2021 at 05:45:38AM +, Alexey Trenikhun wrote:
> > In my opinion looks similar. Were you able to tune-up Flink to make it
> work? I'm stuck with it, I wanted to scale up hoping to reduce
> backpressure, but to rescale I need to take savepoint, which never
> completes (at least takes longer than 3 hours).
>
> You can use aligned checkpoint to scala your job. Just restarting from
> checkpoint with the same jar file, and new parallelism shall do the
> trick.
>
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
>


Re: Checkpoint fail due to timeout

2021-03-22 Thread Roman Khachatryan
Thanks for sharing the thread dump.

It shows that the source thread is indeed back-pressured
(checkpoint lock is held by a thread which is trying to emit but
unable to acquire any free buffers).

The lock is per task, so there can be several locks per TM.

@ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely
the same issue (but I can't tell for sure without a full thread dump)


Regards,
Roman

On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬)  wrote:
>
> On Tue, Mar 16, 2021 at 02:32:54AM +, Alexey Trenikhun wrote:
> > Hi Roman,
> > I took thread dump:
> > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on 
> > java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: 
> > digital-itx-eastus2 -> Filter (6/6)#0" Id=202
> > at 
> > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> > -  blocked on java.lang.Object@5366a0e2
> > at 
> > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> > at 
> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> > at 
> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> > at 
> > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> > at 
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> >
> > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" 
> > Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> > at sun.misc.Unsafe.park(Native Method)
> > -  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> > at 
> > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> > at 
> > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> > at 
> > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> > at 
> > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> > at 
> > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> > at 
> > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
> >
> > Is it checkpoint lock? Is checkpoint lock per task or per TM? I see 
> > multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing 
> > blocked on different Objects.
>
> Hi,
>
> This call stack is similar to our case as described in [0]. Maybe they
> are the same issue?
>
> [0] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html
>
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


Re: Checkpoint fail due to timeout

2021-03-17 Thread Alexey Trenikhun
According to [1] checkpoints do not support Flink specific features like 
rescaling, but I can try. Thank you for suggestions

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#difference-to-savepoints
Apache Flink 1.12 Documentation: 
Checkpoints<https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#difference-to-savepoints>
Configure globally via configuration files state.checkpoints.dir: 
hdfs:///checkpoints/ Configure for per job when constructing the state backend 
env. setStateBackend (new RocksDBStateBackend ("hdfs:///checkpoints-data/")); 
Difference to Savepoints
ci.apache.org



From: ChangZhuo Chen (陳昌倬)
Sent: Wednesday, March 17, 2021 12:29 AM
To: Alexey Trenikhun
Cc: ro...@apache.org; Flink User Mail List
Subject: Re: Checkpoint fail due to timeout

On Wed, Mar 17, 2021 at 05:45:38AM +, Alexey Trenikhun wrote:
> In my opinion looks similar. Were you able to tune-up Flink to make it work? 
> I'm stuck with it, I wanted to scale up hoping to reduce backpressure, but to 
> rescale I need to take savepoint, which never completes (at least takes 
> longer than 3 hours).

You can use aligned checkpoint to scala your job. Just restarting from
checkpoint with the same jar file, and new parallelism shall do the
trick.


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


Re: Checkpoint fail due to timeout

2021-03-17 Thread 陳昌倬
On Wed, Mar 17, 2021 at 05:45:38AM +, Alexey Trenikhun wrote:
> In my opinion looks similar. Were you able to tune-up Flink to make it work? 
> I'm stuck with it, I wanted to scale up hoping to reduce backpressure, but to 
> rescale I need to take savepoint, which never completes (at least takes 
> longer than 3 hours).

You can use aligned checkpoint to scala your job. Just restarting from
checkpoint with the same jar file, and new parallelism shall do the
trick.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: Checkpoint fail due to timeout

2021-03-16 Thread Alexey Trenikhun
In my opinion looks similar. Were you able to tune-up Flink to make it work? 
I'm stuck with it, I wanted to scale up hoping to reduce backpressure, but to 
rescale I need to take savepoint, which never completes (at least takes longer 
than 3 hours).



From: ChangZhuo Chen (陳昌倬)
Sent: Tuesday, March 16, 2021 6:59 AM
To: Alexey Trenikhun
Cc: ro...@apache.org; Flink User Mail List
Subject: Re: Checkpoint fail due to timeout

On Tue, Mar 16, 2021 at 02:32:54AM +, Alexey Trenikhun wrote:
> Hi Roman,
> I took thread dump:
> "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on 
> java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: 
> digital-itx-eastus2 -> Filter (6/6)#0" Id=202
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> -  blocked on java.lang.Object@5366a0e2
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>
> "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 
> WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> at sun.misc.Unsafe.park(Native Method)
> -  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
>
> Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple 
> threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on 
> different Objects.

Hi,

This call stack is similar to our case as described in [0]. Maybe they
are the same issue?

[0] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


Re: Checkpoint fail due to timeout

2021-03-16 Thread 陳昌倬
On Tue, Mar 16, 2021 at 02:32:54AM +, Alexey Trenikhun wrote:
> Hi Roman,
> I took thread dump:
> "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on 
> java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: 
> digital-itx-eastus2 -> Filter (6/6)#0" Id=202
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> -  blocked on java.lang.Object@5366a0e2
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> 
> "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 
> WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> at sun.misc.Unsafe.park(Native Method)
> -  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
> 
> Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple 
> threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on 
> different Objects.

Hi,

This call stack is similar to our case as described in [0]. Maybe they
are the same issue?

[0] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: Checkpoint fail due to timeout

2021-03-15 Thread Alexey Trenikhun
Hi Roman,
I took thread dump:
"Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on 
java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: 
digital-itx-eastus2 -> Filter (6/6)#0" Id=202
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
-  blocked on java.lang.Object@5366a0e2
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)

"Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 
WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
at sun.misc.Unsafe.park(Native Method)
-  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)

Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple 
threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on 
different Objects.

Thanks,
Alexey

From: Roman Khachatryan 
Sent: Monday, March 15, 2021 2:16 AM
To: Alexey Trenikhun 
Cc: Flink User Mail List 
Subject: Re: Checkpoint fail due to timeout

Hello Alexey,

Thanks for the details.

It looks like backpressure is indeed the cause of the issue.
You can check that by looking at the (succeeded) checkpoint start
delay in the tasks following the suspected source
(digital-itx-eastus2?).
To be sure, you can take a thread dump (or profile) those sources: the
task thread should be waiting for checkpoint lock; while the legacy
source thread should be holding it and waiting to output data.

One way to deal with this is to use the new Kafka source (based on
FLIP-27) which will hopefully be released in 1.13 (it is an
experimental feature in 1.12).

Regards,
Roman

On Fri, Mar 12, 2021 at 8:43 PM Alexey Trenikhun  wrote:
>
> Hello Roman,
>
>  history, details and summary stats are attached.
> There is backpressure on all sources except Source:gca-cfg and 
> Source:heartbeat
> Flink version 1.12.1, I also trying 1.12.2 with same results
>
> Thanks,
> Alexey
> 
> From: Roman Khachatryan 
> Sent: Thursday, March 11, 2021 11:49 PM
> To: Alexey Trenikhun 
> Cc: Flink User Mail List 
> Subject: Re: Checkpoint fail due to timeout
>
> Hello,
>
> This can be caused by several reasons such as back-pressure, large
> snapshots or bugs.
>
> Could you please share:
> - the stats of the previous (successful) checkpoints
> - back-pressure metrics for sources
> - which Flink version do you use?
>
> Regards,
> Roman
>
>
> On Thu, Mar 11, 2021 at 7:03 AM Alexey Trenikhun  wrote:
> >
> > Hello,
> > We are experiencing the problem with checkpoints failing due to timeout 
> > (already set to 30 minute, still failing), checkpoints were not too big 
> > before they started to fail, around 1.2Gb. Looks like one of sources 
> > (Kafka) never acknowledged (see attached screenshot). What could be the 
> > reason?
> >
> > Thanks,
> > Alexey
> >
> >


Re: Checkpoint fail due to timeout

2021-03-15 Thread Roman Khachatryan
Hello Alexey,

Thanks for the details.

It looks like backpressure is indeed the cause of the issue.
You can check that by looking at the (succeeded) checkpoint start
delay in the tasks following the suspected source
(digital-itx-eastus2?).
To be sure, you can take a thread dump (or profile) those sources: the
task thread should be waiting for checkpoint lock; while the legacy
source thread should be holding it and waiting to output data.

One way to deal with this is to use the new Kafka source (based on
FLIP-27) which will hopefully be released in 1.13 (it is an
experimental feature in 1.12).

Regards,
Roman

On Fri, Mar 12, 2021 at 8:43 PM Alexey Trenikhun  wrote:
>
> Hello Roman,
>
>  history, details and summary stats are attached.
> There is backpressure on all sources except Source:gca-cfg and 
> Source:heartbeat
> Flink version 1.12.1, I also trying 1.12.2 with same results
>
> Thanks,
> Alexey
> 
> From: Roman Khachatryan 
> Sent: Thursday, March 11, 2021 11:49 PM
> To: Alexey Trenikhun 
> Cc: Flink User Mail List 
> Subject: Re: Checkpoint fail due to timeout
>
> Hello,
>
> This can be caused by several reasons such as back-pressure, large
> snapshots or bugs.
>
> Could you please share:
> - the stats of the previous (successful) checkpoints
> - back-pressure metrics for sources
> - which Flink version do you use?
>
> Regards,
> Roman
>
>
> On Thu, Mar 11, 2021 at 7:03 AM Alexey Trenikhun  wrote:
> >
> > Hello,
> > We are experiencing the problem with checkpoints failing due to timeout 
> > (already set to 30 minute, still failing), checkpoints were not too big 
> > before they started to fail, around 1.2Gb. Looks like one of sources 
> > (Kafka) never acknowledged (see attached screenshot). What could be the 
> > reason?
> >
> > Thanks,
> > Alexey
> >
> >


Re: Checkpoint fail due to timeout

2021-03-11 Thread Roman Khachatryan
Hello,

This can be caused by several reasons such as back-pressure, large
snapshots or bugs.

Could you please share:
- the stats of the previous (successful) checkpoints
- back-pressure metrics for sources
- which Flink version do you use?

Regards,
Roman


On Thu, Mar 11, 2021 at 7:03 AM Alexey Trenikhun  wrote:
>
> Hello,
> We are experiencing the problem with checkpoints failing due to timeout 
> (already set to 30 minute, still failing), checkpoints were not too big 
> before they started to fail, around 1.2Gb. Looks like one of sources (Kafka) 
> never acknowledged (see attached screenshot). What could be the reason?
>
> Thanks,
> Alexey
>
>


Re: Re: Re: Checkpoint Error

2021-03-10 Thread Till Rohrmann
Could it be that another process might have deleted the in progress
checkpoint file?

Cheers,
Till

On Mon, Mar 8, 2021 at 4:31 PM Yun Gao  wrote:

> Hi Navneeth,
>
> Is the attached exception the root cause for the checkpoint failure ?
> Namely is it also reported in job manager log?
>
> Also, have you enabled concurrent checkpoint?
>
> Best,
>  Yun
>
>
> --Original Mail --
> *Sender:*Navneeth Krishnan 
> *Send Date:*Mon Mar 8 13:10:46 2021
> *Recipients:*Yun Gao 
> *CC:*user 
> *Subject:*Re: Re: Checkpoint Error
>
>> Hi Yun,
>>
>> Thanks for the response. I checked the mounts and only the JM's and TM's
>> are mounted with this EFS. Not sure how to debug this.
>>
>> Thanks
>>
>> On Sun, Mar 7, 2021 at 8:29 PM Yun Gao  wrote:
>>
>>> Hi Navneeth,
>>>
>>> It seems from the stack that the exception is caused by the underlying
>>> EFS problems ? Have you checked
>>> if there are errors reported for EFS, or if there might be duplicate
>>> mounting for the same EFS and others
>>> have ever deleted the directory?
>>>
>>> Best,
>>> Yun
>>>
>>>
>>> --Original Mail --
>>> *Sender:*Navneeth Krishnan 
>>> *Send Date:*Sun Mar 7 15:44:59 2021
>>> *Recipients:*user 
>>> *Subject:*Re: Checkpoint Error
>>>
>>>> Hi All,
>>>>
>>>> Any suggestions?
>>>>
>>>> Thanks
>>>>
>>>> On Mon, Jan 18, 2021 at 7:38 PM Navneeth Krishnan <
>>>> reachnavnee...@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> We are running our streaming job on flink 1.7.2 and we are noticing
>>>>> the below error. Not sure what's causing it, any pointers would help. We
>>>>> have 10 TM's checkpointing to AWS EFS.
>>>>>
>>>>> AsynchronousException{java.lang.Exception: Could not materialize 
>>>>> checkpoint 11 for operator Processor -> Sink: KafkaSink (34/42).}at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)at
>>>>>  
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)at
>>>>>  
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)at
>>>>>  
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at 
>>>>> java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at
>>>>>  
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at
>>>>>  java.lang.Thread.run(Thread.java:748)Caused by: java.lang.Exception: 
>>>>> Could not materialize checkpoint 11 for operator Processor -> Sink: 
>>>>> KafkaSink (34/42).at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)...
>>>>>  6 moreCaused by: java.util.concurrent.ExecutionException: 
>>>>> java.io.IOException: Could not flush and close the file system output 
>>>>> stream to 
>>>>> file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
>>>>>  in order to obtain the stream state handleat 
>>>>> java.util.concurrent.FutureTask.report(FutureTask.java:122)at 
>>>>> java.util.concurrent.FutureTask.get(FutureTask.java:192)at 
>>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)at 
>>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)at
>>>>>  
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)...
>>>>>  5 moreCaused by: java.io.IOException: Could not flush and close the file 
>>>>> system output stream to 
>>>>> file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
>>>>>  in order to obtain the stream state handleat 
>>>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointSt

Re: Re: Re: Checkpoint Error

2021-03-08 Thread Yun Gao
Hi Navneeth,

Is the attached exception the root cause for the checkpoint failure ?
Namely is it also reported in job manager log?

Also, have you enabled concurrent checkpoint? 

Best,
 Yun



 --Original Mail --
Sender:Navneeth Krishnan 
Send Date:Mon Mar 8 13:10:46 2021
Recipients:Yun Gao 
CC:user 
Subject:Re: Re: Checkpoint Error

Hi Yun,

Thanks for the response. I checked the mounts and only the JM's and TM's are 
mounted with this EFS. Not sure how to debug this.

Thanks
On Sun, Mar 7, 2021 at 8:29 PM Yun Gao  wrote:

Hi Navneeth,

It seems from the stack that the exception is caused by the underlying EFS 
problems ? Have you checked
if there are errors reported for EFS, or if there might be duplicate mounting 
for the same EFS and others
have ever deleted the directory?

Best,
Yun



 --Original Mail --
Sender:Navneeth Krishnan 
Send Date:Sun Mar 7 15:44:59 2021
Recipients:user 
Subject:Re: Checkpoint Error

Hi All,

Any suggestions?

Thanks
On Mon, Jan 18, 2021 at 7:38 PM Navneeth Krishnan  
wrote:

Hi All,

We are running our streaming job on flink 1.7.2 and we are noticing the below 
error. Not sure what's causing it, any pointers would help. We have 10 TM's 
checkpointing to AWS EFS.

AsynchronousException{java.lang.Exception: Could not materialize checkpoint 11 
for operator Processor -> Sink: KafkaSink (34/42).}at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at
 java.lang.Thread.run(Thread.java:748)Caused by: java.lang.Exception: Could not 
materialize checkpoint 11 for operator Processor -> Sink: KafkaSink (34/42).at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)...
 6 moreCaused by: java.util.concurrent.ExecutionException: java.io.IOException: 
Could not flush and close the file system output stream to 
file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
 in order to obtain the stream state handleat 
java.util.concurrent.FutureTask.report(FutureTask.java:122)at 
java.util.concurrent.FutureTask.get(FutureTask.java:192)at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)...
 5 moreCaused by: java.io.IOException: Could not flush and close the file 
system output stream to 
file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
 in order to obtain the stream state handleat 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)at
 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)at
 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)at
 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)at
 java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)... 7 
moreCaused by: java.io.IOException: Stale file handleat 
java.io.FileOutputStream.close0(Native Method)at 
java.io.FileOutputStream.access$000(FileOutputStream.java:53)at 
java.io.FileOutputStream$1.close(FileOutputStream.java:356)at 
java.io.FileDescriptor.closeAll(FileDescriptor.java:212)at 
java.io.FileOutputStream.close(FileOutputStream.java:354)at 
org.apache.flink.core.fs.local.LocalDataOutputStream.close(LocalDataOutputStream.java:62)at
 
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)at
 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)...
 12 more

Thanks

Re: Re: Checkpoint Error

2021-03-07 Thread Navneeth Krishnan
Hi Yun,

Thanks for the response. I checked the mounts and only the JM's and TM's
are mounted with this EFS. Not sure how to debug this.

Thanks

On Sun, Mar 7, 2021 at 8:29 PM Yun Gao  wrote:

> Hi Navneeth,
>
> It seems from the stack that the exception is caused by the underlying EFS
> problems ? Have you checked
> if there are errors reported for EFS, or if there might be duplicate
> mounting for the same EFS and others
> have ever deleted the directory?
>
> Best,
> Yun
>
>
> --Original Mail --
> *Sender:*Navneeth Krishnan 
> *Send Date:*Sun Mar 7 15:44:59 2021
> *Recipients:*user 
> *Subject:*Re: Checkpoint Error
>
>> Hi All,
>>
>> Any suggestions?
>>
>> Thanks
>>
>> On Mon, Jan 18, 2021 at 7:38 PM Navneeth Krishnan <
>> reachnavnee...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> We are running our streaming job on flink 1.7.2 and we are noticing the
>>> below error. Not sure what's causing it, any pointers would help. We have
>>> 10 TM's checkpointing to AWS EFS.
>>>
>>> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
>>> 11 for operator Processor -> Sink: KafkaSink (34/42).}at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)at
>>>  
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)at
>>>  
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)at
>>>  java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at 
>>> java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at
>>>  
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at
>>>  java.lang.Thread.run(Thread.java:748)Caused by: java.lang.Exception: Could 
>>> not materialize checkpoint 11 for operator Processor -> Sink: KafkaSink 
>>> (34/42).at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)...
>>>  6 moreCaused by: java.util.concurrent.ExecutionException: 
>>> java.io.IOException: Could not flush and close the file system output 
>>> stream to 
>>> file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
>>>  in order to obtain the stream state handleat 
>>> java.util.concurrent.FutureTask.report(FutureTask.java:122)at 
>>> java.util.concurrent.FutureTask.get(FutureTask.java:192)at 
>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)at 
>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)at
>>>  
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)...
>>>  5 moreCaused by: java.io.IOException: Could not flush and close the file 
>>> system output stream to 
>>> file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
>>>  in order to obtain the stream state handleat 
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)at
>>>  
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)at
>>>  
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)at
>>>  
>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)at
>>>  java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)... 
>>> 7 moreCaused by: java.io.IOException: Stale file handleat 
>>> java.io.FileOutputStream.close0(Native Method)at 
>>> java.io.FileOutputStream.access$000(FileOutputStream.java:53)at 
>>> java.io.FileOutputStream$1.close(FileOutputStream.java:356)at 
>>> java.io.FileDescriptor.closeAll(FileDescriptor.java:212)at 
>>> java.io.FileOutputStream.close(FileOutputStream.java:354)at 
>>> org.apache.flink.core.fs.local.LocalDataOutputStream.close(LocalDataOutputStream.java:62)at
>>>  
>>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)at
>>>  
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)...
>>>  12 more
>>>
>>>
>>> Thanks
>>>
>>>


Re: Re: Checkpoint Error

2021-03-07 Thread Yun Gao
Hi Navneeth,

It seems from the stack that the exception is caused by the underlying EFS 
problems ? Have you checked
if there are errors reported for EFS, or if there might be duplicate mounting 
for the same EFS and others
have ever deleted the directory?

Best,
Yun



 --Original Mail --
Sender:Navneeth Krishnan 
Send Date:Sun Mar 7 15:44:59 2021
Recipients:user 
Subject:Re: Checkpoint Error

Hi All,

Any suggestions?

Thanks
On Mon, Jan 18, 2021 at 7:38 PM Navneeth Krishnan  
wrote:

Hi All,

We are running our streaming job on flink 1.7.2 and we are noticing the below 
error. Not sure what's causing it, any pointers would help. We have 10 TM's 
checkpointing to AWS EFS.

AsynchronousException{java.lang.Exception: Could not materialize checkpoint 11 
for operator Processor -> Sink: KafkaSink (34/42).}at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at
 java.lang.Thread.run(Thread.java:748)Caused by: java.lang.Exception: Could not 
materialize checkpoint 11 for operator Processor -> Sink: KafkaSink (34/42).at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)...
 6 moreCaused by: java.util.concurrent.ExecutionException: java.io.IOException: 
Could not flush and close the file system output stream to 
file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
 in order to obtain the stream state handleat 
java.util.concurrent.FutureTask.report(FutureTask.java:122)at 
java.util.concurrent.FutureTask.get(FutureTask.java:192)at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)...
 5 moreCaused by: java.io.IOException: Could not flush and close the file 
system output stream to 
file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
 in order to obtain the stream state handleat 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)at
 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)at
 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)at
 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)at
 java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)... 7 
moreCaused by: java.io.IOException: Stale file handleat 
java.io.FileOutputStream.close0(Native Method)at 
java.io.FileOutputStream.access$000(FileOutputStream.java:53)at 
java.io.FileOutputStream$1.close(FileOutputStream.java:356)at 
java.io.FileDescriptor.closeAll(FileDescriptor.java:212)at 
java.io.FileOutputStream.close(FileOutputStream.java:354)at 
org.apache.flink.core.fs.local.LocalDataOutputStream.close(LocalDataOutputStream.java:62)at
 
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)at
 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)...
 12 more

Thanks

Re: Checkpoint Error

2021-03-06 Thread Navneeth Krishnan
Hi All,

Any suggestions?

Thanks

On Mon, Jan 18, 2021 at 7:38 PM Navneeth Krishnan 
wrote:

> Hi All,
>
> We are running our streaming job on flink 1.7.2 and we are noticing the
> below error. Not sure what's causing it, any pointers would help. We have
> 10 TM's checkpointing to AWS EFS.
>
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
> 11 for operator Processor -> Sink: KafkaSink (34/42).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 11 for 
> operator Processor -> Sink: KafkaSink (34/42).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
>  in order to obtain the stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>   ... 5 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
>  in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)
>   at 
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
>   ... 7 more
> Caused by: java.io.IOException: Stale file handle
>   at java.io.FileOutputStream.close0(Native Method)
>   at java.io.FileOutputStream.access$000(FileOutputStream.java:53)
>   at java.io.FileOutputStream$1.close(FileOutputStream.java:356)
>   at java.io.FileDescriptor.closeAll(FileDescriptor.java:212)
>   at java.io.FileOutputStream.close(FileOutputStream.java:354)
>   at 
> org.apache.flink.core.fs.local.LocalDataOutputStream.close(LocalDataOutputStream.java:62)
>   at 
> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)
>   ... 12 more
>
>
> Thanks
>
>


Re: Checkpoint problem in 1.12.0

2021-02-03 Thread Till Rohrmann
Thanks for reaching out to the Flink community. I will respond on the JIRA
ticket.

Cheers,
Till

On Wed, Feb 3, 2021 at 1:59 PM simpleusr  wrote:

> Hi
>
> I am trying to upgrade from 1.5.5 to 1.12 and checkpointing mechanism seems
> to be broken in our kafka connector sourced datastream jobs.
>
> Since there is a siginificant version gap and there are many backwards
> uncompatible / deprecated changes in flink runtime between versions, I had
> to modify our jobs and noticed that checkpoint offsets are not committed to
> kafka for source connectors.
>
> To simplfiy the issues I created simple repoducer projects:
>
> https://github.com/simpleusr/flink_problem_1.5.5
>
> https://github.com/simpleusr/flink_problem_1.12.0
>
> It seems that there are major changes in the checkpoint infrastructure.
>
> For 1.5.5 checkpoint cycles works as expected as can be seen from the logs
> (please note that sample project contains a small hack in
> org.apache.flink.runtime.minicluster.MiniCluster which prevents cluster
> from
> stopping) :
>
> *[2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)
>
> [2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)
>
> 
>
> [2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)
>
> [2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*
>
> However for 1.12.0 checkpoint cycles stuck at initial checkpoint:
>
> *[2021-02-03 10:06:24,504] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
> 1612339584496 for job ce255b141393a358db734db2d27ef0ea.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
>
> As far as I see, checkpoint cycle is stuck at waiting in
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator for
> coordinatorCheckpointsComplete although coordinatorsToCheckpoint is
> empty...
>
>
> final CompletableFuture coordinatorCheckpointsComplete =
> pendingCheckpointCompletableFuture
>
> .thenComposeAsync((pendingCheckpoint) ->
>
>
> OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
>
> coordinatorsToCheckpoint, pendingCheckpoint, timer),
> timer);
>
>
> Simply returning from
>
> OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion
> when there is no coordinatorsToCheckpoint seems to resolve the problem:
>
> *[2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
> 1612339673380 for job ffb4a06302f7e60e9325f32340d299b2.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)
>
> [2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
> 1612339673380 for job ffb4a06302f7e60e9325f32340d299b2.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)
>
> [2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)
>
> [2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)
>
> [2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @
> 1612339678380 for job ffb4a06302f7e60e9325f32340d299b2.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)
>
> [2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @
> 1612339678380 for job ffb4a06302f7e60e9325f32340d299b2.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)
>
> [2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)
>
> [2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*
>
> I have also created an issue for this
>
> https://issues.apache.org/jira/browse/FLINK-21248
>
>
> Please help me if I am missing something or there is another solution
> without code change.
>
> We need to perform the upgrade and modify our jobs as soon as possible (I
> hope other breaking changes do not happen) so any help will be
> appreciated..
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Re: checkpoint delay consume message

2021-01-05 Thread Arvid Heise
There seems to be a double-post with the mail "Long latency when consuming
a message from KAFKA and checkpoint is enabled". Let's continue discussion
there.

On Sun, Dec 27, 2020 at 8:33 AM nick toker  wrote:

> Hi,
> Hi,  We think  we are using the default values unless we are missing
> something.
> So this doesn't explain the problem we are facing.
> Could you please tell us how to choose synchronous or asynchronous
> checkpoints just to be sure we are using the correct configuration ?
> BR,
> Nick
>
> ‫בתאריך יום ה׳, 24 בדצמ׳ 2020 ב-3:36 מאת ‪lec ssmi‬‏ <‪
> shicheng31...@gmail.com‬‏>:‬
>
>> Checkpoint can be done synchronously and  asynchronously,  the latter is
>> the default .
>> If you chooese  the synchronous way , it may cause this problem.
>>
>> nick toker  于2020年12月23日周三 下午3:53写道:
>>
>>> Hi Yun,
>>>
>>> Sorry but we didn't understand your questions.
>>> The delay we are experiencing is on the *read* side.
>>> The message is written to kafka topic and consumed by flink with a delay
>>> that depends on the checkpoints interval
>>> When we disabled the checkpoints the messages are immediately consumed
>>> We use the EXACTLY-ONCE semantic.
>>>
>>> Please advise.
>>> BR,
>>> Nick
>>>
>>> ‫בתאריך יום ג׳, 22 בדצמ׳ 2020 ב-9:32 מאת ‪Yun Gao‬‏ <‪
>>> yungao...@aliyun.com‬‏>:‬
>>>
>>>> Hi nick,
>>>>
>>>>Sorry I initially think that the data is also write into Kafka with
>>>> flink . So it could be ensured that there is no delay in the write side,
>>>> right ? Does the delay in the read side keeps existing ?
>>>>
>>>> Best,
>>>>  Yun
>>>>
>>>>
>>>>
>>>> --Original Mail --
>>>> *Sender:*nick toker 
>>>> *Send Date:*Tue Dec 22 01:43:50 2020
>>>> *Recipients:*Yun Gao 
>>>> *CC:*user 
>>>> *Subject:*Re: checkpoint delay consume message
>>>>
>>>>> hi
>>>>>
>>>>> i am confused
>>>>>
>>>>> the delay in in the source when reading message not on the sink
>>>>>
>>>>> nick
>>>>>
>>>>> ‫בתאריך יום ב׳, 21 בדצמ׳ 2020 ב-18:12 מאת ‪Yun Gao‬‏ <‪
>>>>> yungao...@aliyun.com‬‏>:‬
>>>>>
>>>>>>  Hi Nick,
>>>>>>
>>>>>> Are you using EXACTLY_ONCE semantics ? If so the sink would use
>>>>>> transactions, and only commit the transaction on checkpoint complete to
>>>>>> ensure end-to-end exactly-once. A detailed description could be find in 
>>>>>> [1]
>>>>>>
>>>>>>
>>>>>> Best,
>>>>>>  Yun
>>>>>>
>>>>>>
>>>>>> [1]
>>>>>> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
>>>>>>
>>>>>> --
>>>>>> Sender:nick toker
>>>>>> Date:2020/12/21 23:52:34
>>>>>> Recipient:user
>>>>>> Theme:checkpoint delay consume message
>>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> We noticed the following behavior:
>>>>>> If we enable the flink checkpoints, we saw that there is a delay
>>>>>> between the time we write a message to the KAFKA topic and the time the
>>>>>> flink kafka connector consumes this message.
>>>>>> The delay is closely related to checkpointInterval and/or
>>>>>> minPauseBetweenCheckpoints meening that the MAX delay when consuming a
>>>>>> message from KAFKA will be one of these parameters.
>>>>>>
>>>>>> Could you please advise how we can remove/control this delay?
>>>>>>
>>>>>> we use flink 1.11.2
>>>>>>
>>>>>> BR
>>>>>> nick
>>>>>>
>>>>>>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Re: checkpoint delay consume message

2020-12-26 Thread nick toker
Hi,
Hi,  We think  we are using the default values unless we are missing
something.
So this doesn't explain the problem we are facing.
Could you please tell us how to choose synchronous or asynchronous
checkpoints just to be sure we are using the correct configuration ?
BR,
Nick

‫בתאריך יום ה׳, 24 בדצמ׳ 2020 ב-3:36 מאת ‪lec ssmi‬‏ <‪
shicheng31...@gmail.com‬‏>:‬

> Checkpoint can be done synchronously and  asynchronously,  the latter is
> the default .
> If you chooese  the synchronous way , it may cause this problem.
>
> nick toker  于2020年12月23日周三 下午3:53写道:
>
>> Hi Yun,
>>
>> Sorry but we didn't understand your questions.
>> The delay we are experiencing is on the *read* side.
>> The message is written to kafka topic and consumed by flink with a delay
>> that depends on the checkpoints interval
>> When we disabled the checkpoints the messages are immediately consumed
>> We use the EXACTLY-ONCE semantic.
>>
>> Please advise.
>> BR,
>> Nick
>>
>> ‫בתאריך יום ג׳, 22 בדצמ׳ 2020 ב-9:32 מאת ‪Yun Gao‬‏ <‪
>> yungao...@aliyun.com‬‏>:‬
>>
>>> Hi nick,
>>>
>>>Sorry I initially think that the data is also write into Kafka with
>>> flink . So it could be ensured that there is no delay in the write side,
>>> right ? Does the delay in the read side keeps existing ?
>>>
>>> Best,
>>>  Yun
>>>
>>>
>>>
>>> --Original Mail --
>>> *Sender:*nick toker 
>>> *Send Date:*Tue Dec 22 01:43:50 2020
>>> *Recipients:*Yun Gao 
>>> *CC:*user 
>>> *Subject:*Re: checkpoint delay consume message
>>>
>>>> hi
>>>>
>>>> i am confused
>>>>
>>>> the delay in in the source when reading message not on the sink
>>>>
>>>> nick
>>>>
>>>> ‫בתאריך יום ב׳, 21 בדצמ׳ 2020 ב-18:12 מאת ‪Yun Gao‬‏ <‪
>>>> yungao...@aliyun.com‬‏>:‬
>>>>
>>>>>  Hi Nick,
>>>>>
>>>>> Are you using EXACTLY_ONCE semantics ? If so the sink would use
>>>>> transactions, and only commit the transaction on checkpoint complete to
>>>>> ensure end-to-end exactly-once. A detailed description could be find in 
>>>>> [1]
>>>>>
>>>>>
>>>>> Best,
>>>>>  Yun
>>>>>
>>>>>
>>>>> [1]
>>>>> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
>>>>>
>>>>> --
>>>>> Sender:nick toker
>>>>> Date:2020/12/21 23:52:34
>>>>> Recipient:user
>>>>> Theme:checkpoint delay consume message
>>>>>
>>>>> Hello,
>>>>>
>>>>> We noticed the following behavior:
>>>>> If we enable the flink checkpoints, we saw that there is a delay
>>>>> between the time we write a message to the KAFKA topic and the time the
>>>>> flink kafka connector consumes this message.
>>>>> The delay is closely related to checkpointInterval and/or
>>>>> minPauseBetweenCheckpoints meening that the MAX delay when consuming a
>>>>> message from KAFKA will be one of these parameters.
>>>>>
>>>>> Could you please advise how we can remove/control this delay?
>>>>>
>>>>> we use flink 1.11.2
>>>>>
>>>>> BR
>>>>> nick
>>>>>
>>>>>


Re: Re: checkpoint delay consume message

2020-12-23 Thread lec ssmi
Checkpoint can be done synchronously and  asynchronously,  the latter is
the default .
If you chooese  the synchronous way , it may cause this problem.

nick toker  于2020年12月23日周三 下午3:53写道:

> Hi Yun,
>
> Sorry but we didn't understand your questions.
> The delay we are experiencing is on the *read* side.
> The message is written to kafka topic and consumed by flink with a delay
> that depends on the checkpoints interval
> When we disabled the checkpoints the messages are immediately consumed
> We use the EXACTLY-ONCE semantic.
>
> Please advise.
> BR,
> Nick
>
> ‫בתאריך יום ג׳, 22 בדצמ׳ 2020 ב-9:32 מאת ‪Yun Gao‬‏ <‪yungao...@aliyun.com
> ‬‏>:‬
>
>> Hi nick,
>>
>>Sorry I initially think that the data is also write into Kafka with
>> flink . So it could be ensured that there is no delay in the write side,
>> right ? Does the delay in the read side keeps existing ?
>>
>> Best,
>>  Yun
>>
>>
>>
>> --Original Mail ------
>> *Sender:*nick toker 
>> *Send Date:*Tue Dec 22 01:43:50 2020
>> *Recipients:*Yun Gao 
>> *CC:*user 
>> *Subject:*Re: checkpoint delay consume message
>>
>>> hi
>>>
>>> i am confused
>>>
>>> the delay in in the source when reading message not on the sink
>>>
>>> nick
>>>
>>> ‫בתאריך יום ב׳, 21 בדצמ׳ 2020 ב-18:12 מאת ‪Yun Gao‬‏ <‪
>>> yungao...@aliyun.com‬‏>:‬
>>>
>>>>  Hi Nick,
>>>>
>>>> Are you using EXACTLY_ONCE semantics ? If so the sink would use
>>>> transactions, and only commit the transaction on checkpoint complete to
>>>> ensure end-to-end exactly-once. A detailed description could be find in [1]
>>>>
>>>>
>>>> Best,
>>>>  Yun
>>>>
>>>>
>>>> [1]
>>>> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
>>>>
>>>> --
>>>> Sender:nick toker
>>>> Date:2020/12/21 23:52:34
>>>> Recipient:user
>>>> Theme:checkpoint delay consume message
>>>>
>>>> Hello,
>>>>
>>>> We noticed the following behavior:
>>>> If we enable the flink checkpoints, we saw that there is a delay
>>>> between the time we write a message to the KAFKA topic and the time the
>>>> flink kafka connector consumes this message.
>>>> The delay is closely related to checkpointInterval and/or
>>>> minPauseBetweenCheckpoints meening that the MAX delay when consuming a
>>>> message from KAFKA will be one of these parameters.
>>>>
>>>> Could you please advise how we can remove/control this delay?
>>>>
>>>> we use flink 1.11.2
>>>>
>>>> BR
>>>> nick
>>>>
>>>>


Re: Re: checkpoint delay consume message

2020-12-22 Thread nick toker
Hi Yun,

Sorry but we didn't understand your questions.
The delay we are experiencing is on the *read* side.
The message is written to kafka topic and consumed by flink with a delay
that depends on the checkpoints interval
When we disabled the checkpoints the messages are immediately consumed
We use the EXACTLY-ONCE semantic.

Please advise.
BR,
Nick

‫בתאריך יום ג׳, 22 בדצמ׳ 2020 ב-9:32 מאת ‪Yun Gao‬‏ <‪yungao...@aliyun.com
‬‏>:‬

> Hi nick,
>
>Sorry I initially think that the data is also write into Kafka with
> flink . So it could be ensured that there is no delay in the write side,
> right ? Does the delay in the read side keeps existing ?
>
> Best,
>  Yun
>
>
>
> --Original Mail --
> *Sender:*nick toker 
> *Send Date:*Tue Dec 22 01:43:50 2020
> *Recipients:*Yun Gao 
> *CC:*user 
> *Subject:*Re: checkpoint delay consume message
>
>> hi
>>
>> i am confused
>>
>> the delay in in the source when reading message not on the sink
>>
>> nick
>>
>> ‫בתאריך יום ב׳, 21 בדצמ׳ 2020 ב-18:12 מאת ‪Yun Gao‬‏ <‪
>> yungao...@aliyun.com‬‏>:‬
>>
>>>  Hi Nick,
>>>
>>> Are you using EXACTLY_ONCE semantics ? If so the sink would use
>>> transactions, and only commit the transaction on checkpoint complete to
>>> ensure end-to-end exactly-once. A detailed description could be find in [1]
>>>
>>>
>>> Best,
>>>  Yun
>>>
>>>
>>> [1]
>>> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
>>>
>>> --
>>> Sender:nick toker
>>> Date:2020/12/21 23:52:34
>>> Recipient:user
>>> Theme:checkpoint delay consume message
>>>
>>> Hello,
>>>
>>> We noticed the following behavior:
>>> If we enable the flink checkpoints, we saw that there is a delay between
>>> the time we write a message to the KAFKA topic and the time the flink kafka
>>> connector consumes this message.
>>> The delay is closely related to checkpointInterval and/or
>>> minPauseBetweenCheckpoints meening that the MAX delay when consuming a
>>> message from KAFKA will be one of these parameters.
>>>
>>> Could you please advise how we can remove/control this delay?
>>>
>>> we use flink 1.11.2
>>>
>>> BR
>>> nick
>>>
>>>


Re: Re: checkpoint delay consume message

2020-12-21 Thread Yun Gao
Hi nick,

   Sorry I initially think that the data is also write into Kafka with flink . 
So it could be ensured that there is no delay in the write side, right ? Does 
the delay in the read side keeps existing ?

Best,
 Yun




 --Original Mail --
Sender:nick toker 
Send Date:Tue Dec 22 01:43:50 2020
Recipients:Yun Gao 
CC:user 
Subject:Re: checkpoint delay consume message

hi

i am confused

the delay in in the source when reading message not on the sink

nick

‫בתאריך יום ב׳, 21 בדצמ׳ 2020 ב-18:12 מאת ‪Yun Gao‬‏ <‪yungao...@aliyun.com‬‏>:‬

 Hi Nick,

Are you using EXACTLY_ONCE semantics ? If so the sink would use 
transactions, and only commit the transaction on checkpoint complete to ensure 
end-to-end exactly-once. A detailed description could be find in [1]


Best,
 Yun


[1] 
https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html

--
Sender:nick toker
Date:2020/12/21 23:52:34
Recipient:user
Theme:checkpoint delay consume message

Hello,

We noticed the following behavior:
If we enable the flink checkpoints, we saw that there is a delay between the 
time we write a message to the KAFKA topic and the time the flink kafka 
connector consumes this message.
The delay is closely related to checkpointInterval and/or 
minPauseBetweenCheckpoints meening that the MAX delay when consuming a message 
from KAFKA will be one of these parameters.

Could you please advise how we can remove/control this delay?

we use flink 1.11.2

BR
nick



Re: checkpoint delay consume message

2020-12-21 Thread nick toker
hi

i am confused

the delay in in the source when reading message not on the sink

nick

‫בתאריך יום ב׳, 21 בדצמ׳ 2020 ב-18:12 מאת ‪Yun Gao‬‏ <‪yungao...@aliyun.com
‬‏>:‬

>  Hi Nick,
>
> Are you using EXACTLY_ONCE semantics ? If so the sink would use
> transactions, and only commit the transaction on checkpoint complete to
> ensure end-to-end exactly-once. A detailed description could be find in [1]
>
>
> Best,
>  Yun
>
>
> [1]
> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
>
> --
> Sender:nick toker
> Date:2020/12/21 23:52:34
> Recipient:user
> Theme:checkpoint delay consume message
>
> Hello,
>
> We noticed the following behavior:
> If we enable the flink checkpoints, we saw that there is a delay between
> the time we write a message to the KAFKA topic and the time the flink kafka
> connector consumes this message.
> The delay is closely related to checkpointInterval and/or
> minPauseBetweenCheckpoints meening that the MAX delay when consuming a
> message from KAFKA will be one of these parameters.
>
> Could you please advise how we can remove/control this delay?
>
> we use flink 1.11.2
>
> BR
> nick
>
>


Re: checkpoint delay consume message

2020-12-21 Thread Yun Gao
 Hi Nick,

Are you using EXACTLY_ONCE semantics ? If so the sink would use 
transactions, and only commit the transaction on checkpoint complete to ensure 
end-to-end exactly-once. A detailed description could be find in [1]


Best,
 Yun


[1] 
https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html

--
Sender:nick toker
Date:2020/12/21 23:52:34
Recipient:user
Theme:checkpoint delay consume message

Hello,

We noticed the following behavior:
If we enable the flink checkpoints, we saw that there is a delay between the 
time we write a message to the KAFKA topic and the time the flink kafka 
connector consumes this message.
The delay is closely related to checkpointInterval and/or 
minPauseBetweenCheckpoints meening that the MAX delay when consuming a message 
from KAFKA will be one of these parameters.

Could you please advise how we can remove/control this delay?

we use flink 1.11.2

BR
nick



Re: checkpoint interval and hdfs file capacity

2020-11-11 Thread Congxian Qiu
Hi
Currently, checkpoint discard logic was executed in Executor[1], maybe
it will not be deleted so quickly

[1]
https://github.com/apache/flink/blob/91404f435f20c5cd6714ee18bf4ccf95c81fb73e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java#L45

Best,
Congxian


lec ssmi  于2020年11月10日周二 下午2:25写道:

> Thanks.
>I have some jobs with the checkpoint interval 1000ms. And the HDFS
> files grow too large to work normally .
> What I am curious about is, are writing and deleting performed
> synchronously? Is it possible to add too fast to delete old files?
>
> Congxian Qiu  于2020年11月10日周二 下午2:16写道:
>
>> Hi
>> No matter what interval you set, Flink will take care of the
>> checkpoints(remove the useless checkpoint when it can), but when you set a
>> very small checkpoint interval, there may be much high pressure for the
>> storage system(here is RPC pressure of HDFS NN).
>>
>> Best,
>> Congxian
>>
>>
>> lec ssmi  于2020年11月10日周二 下午1:19写道:
>>
>>> Hi, if I set the checkpoint interval to be very small, such as 5
>>> seconds, will there be a lot of state files on HDFS? In theory, no matter
>>> what the interval is set, every time you checkpoint, the old file will be
>>> deleted and new file will be written, right?
>>>
>>


Re: Checkpoint growth

2020-11-10 Thread Akshay Aggarwal
Hi Rex,

As per my understanding there are multiple levels of compactions (with
RocksDB), and files which are not compacted recently would remain in older
checkpoint directories, and there will be references to those files in the
current checkpoint. There is no clear way of identifying these references
and clearing older checkpoint directories.

What we do instead to avoid ever increasing checkpoint directory size is to
stop the job with a savepoint, clear the checkpoints directory and start
the job from the savepoint periodically.

Thanks,
Akshay Aggarwal

On Tue, Nov 10, 2020 at 10:55 PM Rex Fenley  wrote:

> Hello,
>
> I'm reading the docs/blog on incremental checkpoints and it says:
>
> >You can also no longer delete old checkpoints as newer checkpoints need
> them, and the history of differences between checkpoints can grow
> indefinitely over time. You need to plan for larger distributed storage to
> maintain the checkpoints and the network overhead to read from it.
> source:
> https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
>
> I'm wondering why this would be true though. It says earlier that
> incremental checkpoints compact so why would the history grow indefinitely?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>

-- 


*-*

*This email and any files transmitted with it are confidential and 
intended solely for the use of the individual or entity to whom they are 
addressed. If you have received this email in error, please notify the 
system manager. This message contains confidential information and is 
intended only for the individual named. If you are not the named addressee, 
you should not disseminate, distribute or copy this email. Please notify 
the sender immediately by email if you have received this email by mistake 
and delete this email from your system. If you are not the intended 
recipient, you are notified that disclosing, copying, distributing or 
taking any action in reliance on the contents of this information is 
strictly prohibited.*

 

*Any views or opinions presented in this 
email are solely those of the author and do not necessarily represent those 
of the organization. Any information on shares, debentures or similar 
instruments, recommended product pricing, valuations and the like are for 
information purposes only. It is not meant to be an instruction or 
recommendation, as the case may be, to buy or to sell securities, products, 
services nor an offer to buy or sell securities, products or services 
unless specifically stated to be so on behalf of the Flipkart group. 
Employees of the Flipkart group of companies are expressly required not to 
make defamatory statements and not to infringe or authorise any 
infringement of copyright or any other legal right by email communications. 
Any such communication is contrary to organizational policy and outside the 
scope of the employment of the individual concerned. The organization will 
not accept any liability in respect of such communication, and the employee 
responsible will be personally liable for any damages or other liability 
arising.*

 

*Our organization accepts no liability for the 
content of this email, or for the consequences of any actions taken on the 
basis of the information *provided,* unless that information is 
subsequently confirmed in writing. If you are not the intended recipient, 
you are notified that disclosing, copying, distributing or taking any 
action in reliance on the contents of this information is strictly 
prohibited.*

_-_


Re: checkpoint interval and hdfs file capacity

2020-11-09 Thread lec ssmi
Thanks.
   I have some jobs with the checkpoint interval 1000ms. And the HDFS
files grow too large to work normally .
What I am curious about is, are writing and deleting performed
synchronously? Is it possible to add too fast to delete old files?

Congxian Qiu  于2020年11月10日周二 下午2:16写道:

> Hi
> No matter what interval you set, Flink will take care of the
> checkpoints(remove the useless checkpoint when it can), but when you set a
> very small checkpoint interval, there may be much high pressure for the
> storage system(here is RPC pressure of HDFS NN).
>
> Best,
> Congxian
>
>
> lec ssmi  于2020年11月10日周二 下午1:19写道:
>
>> Hi, if I set the checkpoint interval to be very small, such as 5 seconds,
>> will there be a lot of state files on HDFS? In theory, no matter what the
>> interval is set, every time you checkpoint, the old file will be deleted
>> and new file will be written, right?
>>
>


Re: checkpoint interval and hdfs file capacity

2020-11-09 Thread Congxian Qiu
Hi
No matter what interval you set, Flink will take care of the
checkpoints(remove the useless checkpoint when it can), but when you set a
very small checkpoint interval, there may be much high pressure for the
storage system(here is RPC pressure of HDFS NN).

Best,
Congxian


lec ssmi  于2020年11月10日周二 下午1:19写道:

> Hi, if I set the checkpoint interval to be very small, such as 5 seconds,
> will there be a lot of state files on HDFS? In theory, no matter what the
> interval is set, every time you checkpoint, the old file will be deleted
> and new file will be written, right?
>


Re: checkpoint fail

2020-10-10 Thread Yun Tang
Hi Song

Flink-1.4.2 is a bit too old, and I think this error is caused by FLINK-8876 
[1][2] which should be fixed after Flink-1.5, please consider to upgrade Flink 
version.

[1] https://issues.apache.org/jira/browse/FLINK-8876
[2] https://issues.apache.org/jira/browse/FLINK-8836


Best
Yun Tang

From: Song Wu 
Sent: Saturday, October 10, 2020 11:03
To: user 
Subject: checkpoint fail

Summary
I'm hitting an error when running a  job, it happens several times, and I dont 
know why.

Any help would be appreciated.  Thanks!

Details


flink version: 1.4.2-1700


java.lang.Exception: Could not complete snapshot 158 for operator 
asyncio_by_transform -> flatmap_by_action_list_flat -> 
order_source_kafka_sink-preprocessing (3/10).
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:370)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1285)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1223)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:707)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:622)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:575)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:380)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:283)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:185)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:217)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
at java.util.HashMap$EntryIterator.next(HashMap.java:1463)
at java.util.HashMap$EntryIterator.next(HashMap.java:1461)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:198)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:107)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:48)
at 
org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.(DefaultOperatorStateBackend.java:453)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.deepCopy(DefaultOperatorStateBackend.java:465)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:220)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:355)





Re: Checkpoint dir is not cleaned up after cancel the job with monitoring API

2020-10-02 Thread Eleanore Jin
Thanks a lot for the confirmation.

Eleanore

On Fri, Oct 2, 2020 at 2:42 AM Chesnay Schepler  wrote:

> Yes, the patch call only triggers the cancellation.
> You can check whether it is complete by polling the job status via
> jobs/ and checking whether state is CANCELED.
>
> On 9/27/2020 7:02 PM, Eleanore Jin wrote:
>
> I have noticed this: if I have Thread.sleep(1500); after the patch call
> returned 202, then the directory gets cleaned up, in the meanwhile, it
> shows the job-manager pod is in completed state before getting terminated:
> see screenshot: https://ibb.co/3F8HsvG
>
> So the patch call is async to terminate the job? Is there a way to check
> if cancel is completed? So that the stop tm and jm can be called afterwards?
>
> Thanks a lot!
> Eleanore
>
>
> On Sun, Sep 27, 2020 at 9:37 AM Eleanore Jin 
> wrote:
>
>> Hi Congxian,
>> I am making rest call to get the checkpoint config: curl -X GET \
>>
>> http://localhost:8081/jobs/d2c91a44f23efa2b6a0a89b9f1ca5a3d/checkpoints/config
>>
>> and here is the response:
>> {
>> "mode": "at_least_once",
>> "interval": 3000,
>> "timeout": 1,
>> "min_pause": 1000,
>> "max_concurrent": 1,
>> "externalization": {
>> "enabled": false,
>> "delete_on_cancellation": true
>> },
>> "state_backend": "FsStateBackend"
>> }
>>
>> I uploaded a screenshot of how azure blob storage looks like after the
>> cancel call : https://ibb.co/vY64pMZ
>>
>> Thanks a lot!
>> Eleanore
>>
>> On Sat, Sep 26, 2020 at 11:23 PM Congxian Qiu 
>> wrote:
>>
>>> Hi Eleanore
>>>
>>> What the `CheckpointRetentionPolicy`[1] did you set for your job? if
>>> `ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` is set, then the
>>> checkpoint will be kept when canceling a job.
>>>
>>> PS the image did not show
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
>>> Best,
>>> Congxian
>>>
>>>
>>> Eleanore Jin  于2020年9月27日周日 下午1:50写道:
>>>
 Hi experts,

 I am running flink 1.10.2 on kubernetes as per job cluster. Checkpoint
 is enabled, with interval 3s, minimumPause 1s, timeout 10s. I'm
 using FsStateBackend, snapshots are persisted to azure blob storage
 (Microsoft cloud storage service).

 Checkpointed state is just source kafka topic offsets, the flink job is
 stateless as it does filter/json transformation.

 The way I am trying to stop the flink job is via monitoring rest api
 mentioned in doc
 

 e.g.
 curl -X PATCH \
   '
 http://localhost:8081/jobs/3c00535c182a3a00258e2f57bc11fb1a?mode=cancel'
 \
   -H 'Content-Type: application/json' \
   -d '{}'

 This call returned successfully with statusCode 202, then I stopped the
 task manager pods and job manager pod.

 According to the doc, the checkpoint should be cleaned up after the job
 is stopped/cancelled.
 What I have observed is, the checkpoint dir is not cleaned up, can you
 please shield some lights on what I did wrong?

 Below shows the checkpoint dir for a cancelled flink job.
 [image: image.png]

 Thanks!
 Eleanore


>


Re: Checkpoint dir is not cleaned up after cancel the job with monitoring API

2020-10-02 Thread Chesnay Schepler

Yes, the patch call only triggers the cancellation.
You can check whether it is complete by polling the job status via 
jobs/ and checking whether state is CANCELED.


On 9/27/2020 7:02 PM, Eleanore Jin wrote:
I have noticed this: if I have Thread.sleep(1500); after the patch 
call returned 202, then the directory gets cleaned up, in the 
meanwhile, it shows the job-manager pod is in completed state before 
getting terminated: see screenshot: https://ibb.co/3F8HsvG


So the patch call is async to terminate the job? Is there a way to 
check if cancel is completed? So that the stop tm and jm can be called 
afterwards?


Thanks a lot!
Eleanore


On Sun, Sep 27, 2020 at 9:37 AM Eleanore Jin > wrote:


Hi Congxian,
I am making rest call to get the checkpoint config: curl -X GET \

http://localhost:8081/jobs/d2c91a44f23efa2b6a0a89b9f1ca5a3d/checkpoints/config


and here is the response:
{
    "mode": "at_least_once",
    "interval": 3000,
    "timeout": 1,
    "min_pause": 1000,
    "max_concurrent": 1,
    "externalization": {
        "enabled": false,
        "delete_on_cancellation": true
    },
    "state_backend": "FsStateBackend"
}

I uploaded a screenshot of how azure blob storage looks like after
the cancel call : https://ibb.co/vY64pMZ

Thanks a lot!
Eleanore

On Sat, Sep 26, 2020 at 11:23 PM Congxian Qiu
mailto:qcx978132...@gmail.com>> wrote:

Hi Eleanore
    What the `CheckpointRetentionPolicy`[1] did you set for
your job? if
`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` is set,
then the checkpoint will be kept when canceling a job.

PS the image did not show

[1]

https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
Best,
Congxian


Eleanore Jin mailto:eleanore@gmail.com>> 于2020年9月27日周日
下午1:50写道:

Hi experts,

I am running flink 1.10.2 on kubernetes as per job
cluster. Checkpoint is enabled, with interval 3s,
minimumPause 1s, timeout 10s. I'm using FsStateBackend,
snapshots are persisted to azure blob storage (Microsoft
cloud storage service).

Checkpointed state is just source kafka topic offsets, the
flink job is stateless as it does filter/json transformation.

The way I am trying to stop the flink job is via
monitoring rest api mentioned in doc



e.g.
curl -X PATCH \
 

'http://localhost:8081/jobs/3c00535c182a3a00258e2f57bc11fb1a?mode=cancel'
\
  -H 'Content-Type: application/json' \
  -d '{}'

This call returned successfully with statusCode 202, then
I stopped the task manager pods and job manager pod.

According to the doc, the checkpoint should be cleaned up
after the job is stopped/cancelled.
What I have observed is, the checkpoint dir is not cleaned
up, can you please shield some lights on what I did wrong?

Below shows the checkpoint dir for a cancelled flink job.
image.png

Thanks!
Eleanore





Re: Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-29 Thread Till Rohrmann
Great, thanks Klou!

Cheers,
Till

On Mon, Sep 28, 2020 at 5:07 PM Kostas Kloudas  wrote:

> Hi all,
>
> I will have a look.
>
> Kostas
>
> On Mon, Sep 28, 2020 at 3:56 PM Till Rohrmann 
> wrote:
> >
> > Hi Cristian,
> >
> > thanks for reporting this issue. It looks indeed like a very critical
> problem.
> >
> > The problem seems to be that the ApplicationDispatcherBootstrap class
> produces an exception (that the request job can no longer be found because
> of a lost ZooKeeper connection) which will be interpreted as a job failure.
> Due to this interpretation, the cluster will be shut down with a terminal
> state of FAILED which will cause the HA data to be cleaned up. The exact
> problem occurs in the JobStatusPollingUtils.getJobResult which is called by
> ApplicationDispatcherBootstrap.getJobResult().
> >
> > I think there are two problems here: First of all not every exception
> bubbling up in the future returned by
> ApplicationDispatcherBootstrap.fixJobIdAndRunApplicationAsync() indicates a
> job failure. Some of them can also indicate a framework failure which
> should not lead to the clean up of HA data. The other problem is that the
> polling logic cannot properly handle a temporary connection loss to
> ZooKeeper which is a normal situation.
> >
> > I am pulling in Aljoscha and Klou who worked on this feature and might
> be able to propose a solution for these problems. I've also updated the
> JIRA issue FLINK-19154.
> >
> > Cheers,
> > Till
> >
> > On Wed, Sep 9, 2020 at 9:00 AM Yang Wang  wrote:
> >>
> >> > The job sub directory will be cleaned up when the job
> finished/canceled/failed.
> >> Since we could submit multiple jobs into a Flink session, what i mean
> is when a job
> >> reached to the terminal state, the sub node(e.g.
> /flink/application_/running_job_registry/4d255397c7aeb5327adb567238c983c1)
> >> on the Zookeeper will be cleaned up. But the root
> directory(/flink/application_/) still exists.
> >>
> >>
> >> For your current case, it is a different case(perjob cluster). I think
> we need to figure out why the only
> >> running job reached the terminal state. For example, the restart
> attempts are exhausted. And you
> >> could find the following logs in your JobManager log.
> >>
> >> "org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy"
> >>
> >>
> >> Best,
> >> Yang
> >>
> >>
> >>
> >>
> >> Cristian  于2020年9月9日周三 上午11:26写道:
> >>>
> >>> > The job sub directory will be cleaned up when the job
> finished/canceled/failed.
> >>>
> >>> What does this mean?
> >>>
> >>> Also, to clarify: I'm a very sloppy developer. My jobs crash ALL the
> time... and yet, the jobs would ALWAYS resume from the last checkpoint.
> >>>
> >>> The only cases where I expect Flink to clean up the checkpoint data
> from ZK is when I explicitly stop or cancel the job (in those cases the job
> manager takes a savepoint before cleaning up zk and finishing the cluster).
> >>>
> >>> Which is not the case here. Flink was on autopilot here and decided to
> wipe my poor, good checkpoint metadata as the logs show.
> >>>
> >>> On Tue, Sep 8, 2020, at 7:59 PM, Yang Wang wrote:
> >>>
> >>> AFAIK, the HA data, including Zookeeper meta data and real data on
> DFS, will only be cleaned up
> >>> when the Flink cluster reached terminated state.
> >>>
> >>> So if you are using a session cluster, the root cluster node on Zk
> will be cleaned up after you manually
> >>> stop the session cluster. The job sub directory will be cleaned up
> when the job finished/canceled/failed.
> >>>
> >>> If you are using a job/application cluster, once the only running job
> finished/failed, all the HA data will
> >>> be cleaned up. I think you need to check the job restart strategy you
> have set. For example, the following
> >>> configuration will make the Flink cluster terminated after 10 attempts.
> >>>
> >>> restart-strategy: fixed-delay
> >>> restart-strategy.fixed-delay.attempts: 10
> >>>
> >>>
> >>> Best,
> >>> Yang
> >>>
> >>> Cristian  于2020年9月9日周三 上午12:28写道:
> >>>
> >>>
> >>> I'm using the standalone script to start the cluster.
> >>>
> >>> As far as I can tell, it's not easy to reproduce. We found that
> zookeeper lost a node around the time this happened, but all of our other
> 75 Flink jobs which use the same setup, version and zookeeper, didn't have
> any issues. They didn't even restart.
> >>>
> >>> So unfortunately I don't know how to reproduce this. All I know is I
> can't sleep. I have nightmares were my precious state is deleted. I wake up
> crying and quickly start manually savepointing all jobs just in case,
> because I feel the day of reckon is near. Flinkpocalypse!
> >>>
> >>> On Tue, Sep 8, 2020, at 5:54 AM, Robert Metzger wrote:
> >>>
> >>> Thanks a lot for reporting this problem here Cristian!
> >>>
> >>> I am not super familiar with the involved components, but the behavior
> you are describing doesn't sound right to me.
> >>> Which entrypoint are you using? This is logg

Re: Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-28 Thread Kostas Kloudas
Hi all,

I will have a look.

Kostas

On Mon, Sep 28, 2020 at 3:56 PM Till Rohrmann  wrote:
>
> Hi Cristian,
>
> thanks for reporting this issue. It looks indeed like a very critical problem.
>
> The problem seems to be that the ApplicationDispatcherBootstrap class 
> produces an exception (that the request job can no longer be found because of 
> a lost ZooKeeper connection) which will be interpreted as a job failure. Due 
> to this interpretation, the cluster will be shut down with a terminal state 
> of FAILED which will cause the HA data to be cleaned up. The exact problem 
> occurs in the JobStatusPollingUtils.getJobResult which is called by 
> ApplicationDispatcherBootstrap.getJobResult().
>
> I think there are two problems here: First of all not every exception 
> bubbling up in the future returned by 
> ApplicationDispatcherBootstrap.fixJobIdAndRunApplicationAsync() indicates a 
> job failure. Some of them can also indicate a framework failure which should 
> not lead to the clean up of HA data. The other problem is that the polling 
> logic cannot properly handle a temporary connection loss to ZooKeeper which 
> is a normal situation.
>
> I am pulling in Aljoscha and Klou who worked on this feature and might be 
> able to propose a solution for these problems. I've also updated the JIRA 
> issue FLINK-19154.
>
> Cheers,
> Till
>
> On Wed, Sep 9, 2020 at 9:00 AM Yang Wang  wrote:
>>
>> > The job sub directory will be cleaned up when the job 
>> > finished/canceled/failed.
>> Since we could submit multiple jobs into a Flink session, what i mean is 
>> when a job
>> reached to the terminal state, the sub node(e.g. 
>> /flink/application_/running_job_registry/4d255397c7aeb5327adb567238c983c1)
>> on the Zookeeper will be cleaned up. But the root 
>> directory(/flink/application_/) still exists.
>>
>>
>> For your current case, it is a different case(perjob cluster). I think we 
>> need to figure out why the only
>> running job reached the terminal state. For example, the restart attempts 
>> are exhausted. And you
>> could find the following logs in your JobManager log.
>>
>> "org.apache.flink.runtime.JobException: Recovery is suppressed by 
>> NoRestartBackoffTimeStrategy"
>>
>>
>> Best,
>> Yang
>>
>>
>>
>>
>> Cristian  于2020年9月9日周三 上午11:26写道:
>>>
>>> > The job sub directory will be cleaned up when the job 
>>> > finished/canceled/failed.
>>>
>>> What does this mean?
>>>
>>> Also, to clarify: I'm a very sloppy developer. My jobs crash ALL the 
>>> time... and yet, the jobs would ALWAYS resume from the last checkpoint.
>>>
>>> The only cases where I expect Flink to clean up the checkpoint data from ZK 
>>> is when I explicitly stop or cancel the job (in those cases the job manager 
>>> takes a savepoint before cleaning up zk and finishing the cluster).
>>>
>>> Which is not the case here. Flink was on autopilot here and decided to wipe 
>>> my poor, good checkpoint metadata as the logs show.
>>>
>>> On Tue, Sep 8, 2020, at 7:59 PM, Yang Wang wrote:
>>>
>>> AFAIK, the HA data, including Zookeeper meta data and real data on DFS, 
>>> will only be cleaned up
>>> when the Flink cluster reached terminated state.
>>>
>>> So if you are using a session cluster, the root cluster node on Zk will be 
>>> cleaned up after you manually
>>> stop the session cluster. The job sub directory will be cleaned up when the 
>>> job finished/canceled/failed.
>>>
>>> If you are using a job/application cluster, once the only running job 
>>> finished/failed, all the HA data will
>>> be cleaned up. I think you need to check the job restart strategy you have 
>>> set. For example, the following
>>> configuration will make the Flink cluster terminated after 10 attempts.
>>>
>>> restart-strategy: fixed-delay
>>> restart-strategy.fixed-delay.attempts: 10
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Cristian  于2020年9月9日周三 上午12:28写道:
>>>
>>>
>>> I'm using the standalone script to start the cluster.
>>>
>>> As far as I can tell, it's not easy to reproduce. We found that zookeeper 
>>> lost a node around the time this happened, but all of our other 75 Flink 
>>> jobs which use the same setup, version and zookeeper, didn't have any 
>>> issues. They didn't even restart.
>>>
>>> So unfortunately I don't know how to reproduce this. All I know is I can't 
>>> sleep. I have nightmares were my precious state is deleted. I wake up 
>>> crying and quickly start manually savepointing all jobs just in case, 
>>> because I feel the day of reckon is near. Flinkpocalypse!
>>>
>>> On Tue, Sep 8, 2020, at 5:54 AM, Robert Metzger wrote:
>>>
>>> Thanks a lot for reporting this problem here Cristian!
>>>
>>> I am not super familiar with the involved components, but the behavior you 
>>> are describing doesn't sound right to me.
>>> Which entrypoint are you using? This is logged at the beginning, like this: 
>>> "2020-09-08 14:45:32,807 INFO  
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  Starting 
>>> StandaloneSessionClusterEntrypo

Re: Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-28 Thread Till Rohrmann
Hi Cristian,

thanks for reporting this issue. It looks indeed like a very critical
problem.

The problem seems to be that the ApplicationDispatcherBootstrap class
produces an exception (that the request job can no longer be found because
of a lost ZooKeeper connection) which will be interpreted as a job failure.
Due to this interpretation, the cluster will be shut down with a terminal
state of FAILED which will cause the HA data to be cleaned up. The exact
problem occurs in the JobStatusPollingUtils.getJobResult which is called by
ApplicationDispatcherBootstrap.getJobResult().

I think there are two problems here: First of all not every exception
bubbling up in the future returned by
ApplicationDispatcherBootstrap.fixJobIdAndRunApplicationAsync() indicates a
job failure. Some of them can also indicate a framework failure which
should not lead to the clean up of HA data. The other problem is that the
polling logic cannot properly handle a temporary connection loss to
ZooKeeper which is a normal situation.

I am pulling in Aljoscha and Klou who worked on this feature and might be
able to propose a solution for these problems. I've also updated the JIRA
issue FLINK-19154.

Cheers,
Till

On Wed, Sep 9, 2020 at 9:00 AM Yang Wang  wrote:

> > The job sub directory will be cleaned up when the job
> finished/canceled/failed.
> Since we could submit multiple jobs into a Flink session, what i mean is
> when a job
> reached to the terminal state, the sub node(e.g.
> /flink/application_/running_job_registry/4d255397c7aeb5327adb567238c983c1)
> on the Zookeeper will be cleaned up. But the root
> directory(/flink/application_/) still exists.
>
>
> For your current case, it is a different case(perjob cluster). I think we
> need to figure out why the only
> running job reached the terminal state. For example, the restart attempts
> are exhausted. And you
> could find the following logs in your JobManager log.
>
> "org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy"
>
>
> Best,
> Yang
>
>
>
>
> Cristian  于2020年9月9日周三 上午11:26写道:
>
>> > The job sub directory will be cleaned up when the job
>> finished/canceled/failed.
>>
>> What does this mean?
>>
>> Also, to clarify: I'm a very sloppy developer. My jobs crash ALL the
>> time... and yet, the jobs would ALWAYS resume from the last checkpoint.
>>
>> The only cases where I expect Flink to clean up the checkpoint data from
>> ZK is when I explicitly stop or cancel the job (in those cases the job
>> manager takes a savepoint before cleaning up zk and finishing the cluster).
>>
>> Which is not the case here. Flink was on autopilot here and decided to
>> wipe my poor, good checkpoint metadata as the logs show.
>>
>> On Tue, Sep 8, 2020, at 7:59 PM, Yang Wang wrote:
>>
>> AFAIK, the HA data, including Zookeeper meta data and real data on DFS,
>> will only be cleaned up
>> when the Flink cluster reached terminated state.
>>
>> So if you are using a session cluster, the root cluster node on Zk will
>> be cleaned up after you manually
>> stop the session cluster. The job sub directory will be cleaned up when
>> the job finished/canceled/failed.
>>
>> If you are using a job/application cluster, once the only running job
>> finished/failed, all the HA data will
>> be cleaned up. I think you need to check the job restart strategy you
>> have set. For example, the following
>> configuration will make the Flink cluster terminated after 10 attempts.
>>
>> restart-strategy: fixed-delay
>> restart-strategy.fixed-delay.attempts: 10
>>
>>
>> Best,
>> Yang
>>
>> Cristian  于2020年9月9日周三 上午12:28写道:
>>
>>
>> I'm using the standalone script to start the cluster.
>>
>> As far as I can tell, it's not easy to reproduce. We found that zookeeper
>> lost a node around the time this happened, but all of our other 75 Flink
>> jobs which use the same setup, version and zookeeper, didn't have any
>> issues. They didn't even restart.
>>
>> So unfortunately I don't know how to reproduce this. All I know is I
>> can't sleep. I have nightmares were my precious state is deleted. I wake up
>> crying and quickly start manually savepointing all jobs just in case,
>> because I feel the day of reckon is near. Flinkpocalypse!
>>
>> On Tue, Sep 8, 2020, at 5:54 AM, Robert Metzger wrote:
>>
>> Thanks a lot for reporting this problem here Cristian!
>>
>> I am not super familiar with the involved components, but the behavior
>> you are describing doesn't sound right to me.
>> Which entrypoint are you using? This is logged at the beginning, like
>> this: "2020-09-08 14:45:32,807 INFO
>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>>  Starting StandaloneSessionClusterEntrypoint (Version: 1.11.1, Scala: 2.12,
>> Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)"
>>
>> Do you know by chance if this problem is reproducible? With
>> the StandaloneSessionClusterEntrypoint I was not able to reproduce the
>> problem.
>>
>>
>>
>>
>> On Tue, Sep 8, 2020 at 4:00 AM Hu

Re: Checkpoint dir is not cleaned up after cancel the job with monitoring API

2020-09-27 Thread Eleanore Jin
I have noticed this: if I have Thread.sleep(1500); after the patch call
returned 202, then the directory gets cleaned up, in the meanwhile, it
shows the job-manager pod is in completed state before getting terminated:
see screenshot: https://ibb.co/3F8HsvG

So the patch call is async to terminate the job? Is there a way to check if
cancel is completed? So that the stop tm and jm can be called afterwards?

Thanks a lot!
Eleanore


On Sun, Sep 27, 2020 at 9:37 AM Eleanore Jin  wrote:

> Hi Congxian,
> I am making rest call to get the checkpoint config: curl -X GET \
>
> http://localhost:8081/jobs/d2c91a44f23efa2b6a0a89b9f1ca5a3d/checkpoints/config
>
> and here is the response:
> {
> "mode": "at_least_once",
> "interval": 3000,
> "timeout": 1,
> "min_pause": 1000,
> "max_concurrent": 1,
> "externalization": {
> "enabled": false,
> "delete_on_cancellation": true
> },
> "state_backend": "FsStateBackend"
> }
>
> I uploaded a screenshot of how azure blob storage looks like after the
> cancel call : https://ibb.co/vY64pMZ
>
> Thanks a lot!
> Eleanore
>
> On Sat, Sep 26, 2020 at 11:23 PM Congxian Qiu 
> wrote:
>
>> Hi Eleanore
>>
>> What the `CheckpointRetentionPolicy`[1] did you set for your job? if
>> `ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` is set, then the
>> checkpoint will be kept when canceling a job.
>>
>> PS the image did not show
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
>> Best,
>> Congxian
>>
>>
>> Eleanore Jin  于2020年9月27日周日 下午1:50写道:
>>
>>> Hi experts,
>>>
>>> I am running flink 1.10.2 on kubernetes as per job cluster. Checkpoint
>>> is enabled, with interval 3s, minimumPause 1s, timeout 10s. I'm
>>> using FsStateBackend, snapshots are persisted to azure blob storage
>>> (Microsoft cloud storage service).
>>>
>>> Checkpointed state is just source kafka topic offsets, the flink job is
>>> stateless as it does filter/json transformation.
>>>
>>> The way I am trying to stop the flink job is via monitoring rest api
>>> mentioned in doc
>>> 
>>>
>>> e.g.
>>> curl -X PATCH \
>>>   '
>>> http://localhost:8081/jobs/3c00535c182a3a00258e2f57bc11fb1a?mode=cancel'
>>> \
>>>   -H 'Content-Type: application/json' \
>>>   -d '{}'
>>>
>>> This call returned successfully with statusCode 202, then I stopped the
>>> task manager pods and job manager pod.
>>>
>>> According to the doc, the checkpoint should be cleaned up after the job
>>> is stopped/cancelled.
>>> What I have observed is, the checkpoint dir is not cleaned up, can you
>>> please shield some lights on what I did wrong?
>>>
>>> Below shows the checkpoint dir for a cancelled flink job.
>>> [image: image.png]
>>>
>>> Thanks!
>>> Eleanore
>>>
>>>


Re: Checkpoint dir is not cleaned up after cancel the job with monitoring API

2020-09-27 Thread Eleanore Jin
Hi Congxian,
I am making rest call to get the checkpoint config: curl -X GET \

http://localhost:8081/jobs/d2c91a44f23efa2b6a0a89b9f1ca5a3d/checkpoints/config

and here is the response:
{
"mode": "at_least_once",
"interval": 3000,
"timeout": 1,
"min_pause": 1000,
"max_concurrent": 1,
"externalization": {
"enabled": false,
"delete_on_cancellation": true
},
"state_backend": "FsStateBackend"
}

I uploaded a screenshot of how azure blob storage looks like after the
cancel call : https://ibb.co/vY64pMZ

Thanks a lot!
Eleanore

On Sat, Sep 26, 2020 at 11:23 PM Congxian Qiu 
wrote:

> Hi Eleanore
>
> What the `CheckpointRetentionPolicy`[1] did you set for your job? if
> `ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` is set, then the
> checkpoint will be kept when canceling a job.
>
> PS the image did not show
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
> Best,
> Congxian
>
>
> Eleanore Jin  于2020年9月27日周日 下午1:50写道:
>
>> Hi experts,
>>
>> I am running flink 1.10.2 on kubernetes as per job cluster. Checkpoint is
>> enabled, with interval 3s, minimumPause 1s, timeout 10s. I'm
>> using FsStateBackend, snapshots are persisted to azure blob storage
>> (Microsoft cloud storage service).
>>
>> Checkpointed state is just source kafka topic offsets, the flink job is
>> stateless as it does filter/json transformation.
>>
>> The way I am trying to stop the flink job is via monitoring rest api
>> mentioned in doc
>> 
>>
>> e.g.
>> curl -X PATCH \
>>   '
>> http://localhost:8081/jobs/3c00535c182a3a00258e2f57bc11fb1a?mode=cancel'
>> \
>>   -H 'Content-Type: application/json' \
>>   -d '{}'
>>
>> This call returned successfully with statusCode 202, then I stopped the
>> task manager pods and job manager pod.
>>
>> According to the doc, the checkpoint should be cleaned up after the job
>> is stopped/cancelled.
>> What I have observed is, the checkpoint dir is not cleaned up, can you
>> please shield some lights on what I did wrong?
>>
>> Below shows the checkpoint dir for a cancelled flink job.
>> [image: image.png]
>>
>> Thanks!
>> Eleanore
>>
>>


Re: Checkpoint dir is not cleaned up after cancel the job with monitoring API

2020-09-26 Thread Congxian Qiu
Hi Eleanore

What the `CheckpointRetentionPolicy`[1] did you set for your job? if
`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` is set, then the
checkpoint will be kept when canceling a job.

PS the image did not show

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
Best,
Congxian


Eleanore Jin  于2020年9月27日周日 下午1:50写道:

> Hi experts,
>
> I am running flink 1.10.2 on kubernetes as per job cluster. Checkpoint is
> enabled, with interval 3s, minimumPause 1s, timeout 10s. I'm
> using FsStateBackend, snapshots are persisted to azure blob storage
> (Microsoft cloud storage service).
>
> Checkpointed state is just source kafka topic offsets, the flink job is
> stateless as it does filter/json transformation.
>
> The way I am trying to stop the flink job is via monitoring rest api
> mentioned in doc
> 
>
> e.g.
> curl -X PATCH \
>   'http://localhost:8081/jobs/3c00535c182a3a00258e2f57bc11fb1a?mode=cancel'
> \
>   -H 'Content-Type: application/json' \
>   -d '{}'
>
> This call returned successfully with statusCode 202, then I stopped the
> task manager pods and job manager pod.
>
> According to the doc, the checkpoint should be cleaned up after the job is
> stopped/cancelled.
> What I have observed is, the checkpoint dir is not cleaned up, can you
> please shield some lights on what I did wrong?
>
> Below shows the checkpoint dir for a cancelled flink job.
> [image: image.png]
>
> Thanks!
> Eleanore
>
>


Re: Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-09 Thread Yang Wang
> The job sub directory will be cleaned up when the job
finished/canceled/failed.
Since we could submit multiple jobs into a Flink session, what i mean is
when a job
reached to the terminal state, the sub node(e.g.
/flink/application_/running_job_registry/4d255397c7aeb5327adb567238c983c1)
on the Zookeeper will be cleaned up. But the root
directory(/flink/application_/) still exists.


For your current case, it is a different case(perjob cluster). I think we
need to figure out why the only
running job reached the terminal state. For example, the restart attempts
are exhausted. And you
could find the following logs in your JobManager log.

"org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy"


Best,
Yang




Cristian  于2020年9月9日周三 上午11:26写道:

> > The job sub directory will be cleaned up when the job
> finished/canceled/failed.
>
> What does this mean?
>
> Also, to clarify: I'm a very sloppy developer. My jobs crash ALL the
> time... and yet, the jobs would ALWAYS resume from the last checkpoint.
>
> The only cases where I expect Flink to clean up the checkpoint data from
> ZK is when I explicitly stop or cancel the job (in those cases the job
> manager takes a savepoint before cleaning up zk and finishing the cluster).
>
> Which is not the case here. Flink was on autopilot here and decided to
> wipe my poor, good checkpoint metadata as the logs show.
>
> On Tue, Sep 8, 2020, at 7:59 PM, Yang Wang wrote:
>
> AFAIK, the HA data, including Zookeeper meta data and real data on DFS,
> will only be cleaned up
> when the Flink cluster reached terminated state.
>
> So if you are using a session cluster, the root cluster node on Zk will be
> cleaned up after you manually
> stop the session cluster. The job sub directory will be cleaned up when
> the job finished/canceled/failed.
>
> If you are using a job/application cluster, once the only running job
> finished/failed, all the HA data will
> be cleaned up. I think you need to check the job restart strategy you have
> set. For example, the following
> configuration will make the Flink cluster terminated after 10 attempts.
>
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 10
>
>
> Best,
> Yang
>
> Cristian  于2020年9月9日周三 上午12:28写道:
>
>
> I'm using the standalone script to start the cluster.
>
> As far as I can tell, it's not easy to reproduce. We found that zookeeper
> lost a node around the time this happened, but all of our other 75 Flink
> jobs which use the same setup, version and zookeeper, didn't have any
> issues. They didn't even restart.
>
> So unfortunately I don't know how to reproduce this. All I know is I can't
> sleep. I have nightmares were my precious state is deleted. I wake up
> crying and quickly start manually savepointing all jobs just in case,
> because I feel the day of reckon is near. Flinkpocalypse!
>
> On Tue, Sep 8, 2020, at 5:54 AM, Robert Metzger wrote:
>
> Thanks a lot for reporting this problem here Cristian!
>
> I am not super familiar with the involved components, but the behavior you
> are describing doesn't sound right to me.
> Which entrypoint are you using? This is logged at the beginning, like
> this: "2020-09-08 14:45:32,807 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>  Starting StandaloneSessionClusterEntrypoint (Version: 1.11.1, Scala: 2.12,
> Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)"
>
> Do you know by chance if this problem is reproducible? With
> the StandaloneSessionClusterEntrypoint I was not able to reproduce the
> problem.
>
>
>
>
> On Tue, Sep 8, 2020 at 4:00 AM Husky Zeng <568793...@qq.com> wrote:
>
> Hi Cristian,
>
>
> I don't know if it was designed to be like this deliberately.
>
> So I have already submitted an issue ,and wait for somebody to response.
>
> https://issues.apache.org/jira/browse/FLINK-19154
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>


Re: Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-08 Thread Cristian
> The job sub directory will be cleaned up when the job 
> finished/canceled/failed.

What does this mean? 

Also, to clarify: I'm a very sloppy developer. My jobs crash ALL the time... 
and yet, the jobs would ALWAYS resume from the last checkpoint. 

The only cases where I expect Flink to clean up the checkpoint data from ZK is 
when I explicitly stop or cancel the job (in those cases the job manager takes 
a savepoint before cleaning up zk and finishing the cluster). 

Which is not the case here. Flink was on autopilot here and decided to wipe my 
poor, good checkpoint metadata as the logs show. 

On Tue, Sep 8, 2020, at 7:59 PM, Yang Wang wrote:
> AFAIK, the HA data, including Zookeeper meta data and real data on DFS, will 
> only be cleaned up
> when the Flink cluster reached terminated state.
> 
> So if you are using a session cluster, the root cluster node on Zk will be 
> cleaned up after you manually
> stop the session cluster. The job sub directory will be cleaned up when the 
> job finished/canceled/failed.
> 
> If you are using a job/application cluster, once the only running job 
> finished/failed, all the HA data will
> be cleaned up. I think you need to check the job restart strategy you have 
> set. For example, the following
> configuration will make the Flink cluster terminated after 10 attempts.
> 
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 10
> 
> 
> Best,
> Yang
> 
> Cristian  于2020年9月9日周三 上午12:28写道:
>> __
>> I'm using the standalone script to start the cluster. 
>> 
>> As far as I can tell, it's not easy to reproduce. We found that zookeeper 
>> lost a node around the time this happened, but all of our other 75 Flink 
>> jobs which use the same setup, version and zookeeper, didn't have any 
>> issues. They didn't even restart. 
>> 
>> So unfortunately I don't know how to reproduce this. All I know is I can't 
>> sleep. I have nightmares were my precious state is deleted. I wake up crying 
>> and quickly start manually savepointing all jobs just in case, because I 
>> feel the day of reckon is near. Flinkpocalypse!
>> 
>> On Tue, Sep 8, 2020, at 5:54 AM, Robert Metzger wrote:
>>> Thanks a lot for reporting this problem here Cristian!
>>> 
>>> I am not super familiar with the involved components, but the behavior you 
>>> are describing doesn't sound right to me.
>>> Which entrypoint are you using? This is logged at the beginning, like this: 
>>> "2020-09-08 14:45:32,807 INFO  
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  Starting 
>>> StandaloneSessionClusterEntrypoint (Version: 1.11.1, Scala: 2.12, 
>>> Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)"
>>> 
>>> Do you know by chance if this problem is reproducible? With the 
>>> StandaloneSessionClusterEntrypoint I was not able to reproduce the problem.
>>> 
>>> 
>>> 
>>> 
>>> On Tue, Sep 8, 2020 at 4:00 AM Husky Zeng <568793...@qq.com> wrote:
 Hi Cristian,
 
 
 I don't know if it was designed to be like this deliberately.
 
 So I have already submitted an issue ,and wait for somebody to response.
 
 https://issues.apache.org/jira/browse/FLINK-19154   
 
 
 
 --
 Sent from: 
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-08 Thread Yang Wang
AFAIK, the HA data, including Zookeeper meta data and real data on DFS,
will only be cleaned up
when the Flink cluster reached terminated state.

So if you are using a session cluster, the root cluster node on Zk will be
cleaned up after you manually
stop the session cluster. The job sub directory will be cleaned up when the
job finished/canceled/failed.

If you are using a job/application cluster, once the only running job
finished/failed, all the HA data will
be cleaned up. I think you need to check the job restart strategy you have
set. For example, the following
configuration will make the Flink cluster terminated after 10 attempts.

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10


Best,
Yang

Cristian  于2020年9月9日周三 上午12:28写道:

> I'm using the standalone script to start the cluster.
>
> As far as I can tell, it's not easy to reproduce. We found that zookeeper
> lost a node around the time this happened, but all of our other 75 Flink
> jobs which use the same setup, version and zookeeper, didn't have any
> issues. They didn't even restart.
>
> So unfortunately I don't know how to reproduce this. All I know is I can't
> sleep. I have nightmares were my precious state is deleted. I wake up
> crying and quickly start manually savepointing all jobs just in case,
> because I feel the day of reckon is near. Flinkpocalypse!
>
> On Tue, Sep 8, 2020, at 5:54 AM, Robert Metzger wrote:
>
> Thanks a lot for reporting this problem here Cristian!
>
> I am not super familiar with the involved components, but the behavior you
> are describing doesn't sound right to me.
> Which entrypoint are you using? This is logged at the beginning, like
> this: "2020-09-08 14:45:32,807 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>  Starting StandaloneSessionClusterEntrypoint (Version: 1.11.1, Scala: 2.12,
> Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)"
>
> Do you know by chance if this problem is reproducible? With
> the StandaloneSessionClusterEntrypoint I was not able to reproduce the
> problem.
>
>
>
>
> On Tue, Sep 8, 2020 at 4:00 AM Husky Zeng <568793...@qq.com> wrote:
>
> Hi Cristian,
>
>
> I don't know if it was designed to be like this deliberately.
>
> So I have already submitted an issue ,and wait for somebody to response.
>
> https://issues.apache.org/jira/browse/FLINK-19154
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>


Re: Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-08 Thread Cristian
I'm using the standalone script to start the cluster. 

As far as I can tell, it's not easy to reproduce. We found that zookeeper lost 
a node around the time this happened, but all of our other 75 Flink jobs which 
use the same setup, version and zookeeper, didn't have any issues. They didn't 
even restart. 

So unfortunately I don't know how to reproduce this. All I know is I can't 
sleep. I have nightmares were my precious state is deleted. I wake up crying 
and quickly start manually savepointing all jobs just in case, because I feel 
the day of reckon is near. Flinkpocalypse!

On Tue, Sep 8, 2020, at 5:54 AM, Robert Metzger wrote:
> Thanks a lot for reporting this problem here Cristian!
> 
> I am not super familiar with the involved components, but the behavior you 
> are describing doesn't sound right to me.
> Which entrypoint are you using? This is logged at the beginning, like this: 
> "2020-09-08 14:45:32,807 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  Starting 
> StandaloneSessionClusterEntrypoint (Version: 1.11.1, Scala: 2.12, 
> Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)"
> 
> Do you know by chance if this problem is reproducible? With the 
> StandaloneSessionClusterEntrypoint I was not able to reproduce the problem.
> 
> 
> 
> 
> On Tue, Sep 8, 2020 at 4:00 AM Husky Zeng <568793...@qq.com> wrote:
>> Hi Cristian,
>> 
>> 
>> I don't know if it was designed to be like this deliberately.
>> 
>> So I have already submitted an issue ,and wait for somebody to response.
>> 
>> https://issues.apache.org/jira/browse/FLINK-19154   
>> 
>> 
>> 
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-08 Thread Robert Metzger
Thanks a lot for reporting this problem here Cristian!

I am not super familiar with the involved components, but the behavior you
are describing doesn't sound right to me.
Which entrypoint are you using? This is logged at the beginning, like this:
"2020-09-08 14:45:32,807 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
 Starting StandaloneSessionClusterEntrypoint (Version: 1.11.1, Scala: 2.12,
Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)"

Do you know by chance if this problem is reproducible? With
the StandaloneSessionClusterEntrypoint I was not able to reproduce the
problem.




On Tue, Sep 8, 2020 at 4:00 AM Husky Zeng <568793...@qq.com> wrote:

> Hi Cristian,
>
>
> I don't know if it was designed to be like this deliberately.
>
> So I have already submitted an issue ,and wait for somebody to response.
>
> https://issues.apache.org/jira/browse/FLINK-19154
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-07 Thread Husky Zeng
Hi Cristian,


I don't know if it was designed to be like this deliberately.

So I have already submitted an issue ,and wait for somebody to response.

https://issues.apache.org/jira/browse/FLINK-19154   



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-07 Thread Cristian
That's an excellent question. I can't explain that. All I know is this:

- the job was upgraded and resumed from a savepoint 
- After hours of working fine, it failed (like it shows in the logs) 
- the Metadata was cleaned up, again as shown in the logs
- because I run this in Kubernetes, the container was restarted immediately, 
and because nothing was found in zookeeper it started again from the savepoint 

I didn't realize this was happening after a couple of hours later. At that 
point the job had already checkpointed several times, and it was futile to try 
to start it from a retained checkpoint (assuming there were any). 

My question is... Is this a bug or not? 

On Mon, Sep 7, 2020, at 1:53 AM, Husky Zeng wrote:
> I means that checkpoints are usually dropped after the job was terminated by
> the user (except if explicitly configured as retained Checkpoints).   You
> could use "ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION" to save
> your checkpoint when te cames to failure.
> 
> When your zookeeper lost connection,the High-Availability system ,which rely
> on zookeeper was also failure, it leads to your application stop without
> retry.  
> 
> I hava a question ,  if your application lost zookeeper connection,how did
> it delete the data in zookeeper?
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-07 Thread Husky Zeng
I means that checkpoints are usually dropped after the job was terminated by
the user (except if explicitly configured as retained Checkpoints).   You
could use "ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION" to save
your checkpoint when te cames to failure.

When your zookeeper lost connection,the High-Availability system ,which rely
on zookeeper was also failure, it leads to your application stop without
retry.  

I hava a question ,  if your application lost zookeeper connection,how did
it delete the data in zookeeper?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-05 Thread Cristian
> If you want to save your checkPoint,you could  refer to this document

What do you mean? We already persist our savepoints, and we do not delete them 
explicitly ever. 

The problem is that Flink deleted the data from zookeeper when it shouldn't 
have. Is it possible to start a job from a checkpoint using - - fromSavepoint? 


On Sat, Sep 5, 2020, at 2:05 AM, Husky Zeng wrote:
> 
> Hi Cristian,
> 
> From  this code , we could see that the Exception or Error was ignored in
> dispatcher.shutDownCluster(applicationStatus) .
> 
> ``
> org.apache.flink.runtime.dispatcher.DispatcherGateway#shutDownCluster
> 
> return applicationCompletionFuture
>   .handle((r, t) -> {
>   final ApplicationStatus applicationStatus;
>   if (t != null) {
> 
>   final Optional 
> cancellationException =
>   ExceptionUtils.findThrowable(t, 
> JobCancellationException.class);
> 
>   if (cancellationException.isPresent()) {
>   // this means the Flink Job was 
> cancelled
>   applicationStatus = 
> ApplicationStatus.CANCELED;
>   } else if (t instanceof CancellationException) {
>   // this means that the future was 
> cancelled
>   applicationStatus = 
> ApplicationStatus.UNKNOWN;
>   } else {
>   applicationStatus = 
> ApplicationStatus.FAILED;
>   }
> 
>   LOG.warn("Application {}: ", applicationStatus, 
> t);
>   } else {
>   applicationStatus = ApplicationStatus.SUCCEEDED;
>   LOG.info("Application completed SUCCESSFULLY");
>   }
>   return dispatcher.shutDownCluster(applicationStatus);
>   })
>   .thenCompose(Function.identity());
> 
> ``
> 
> 
> So when it come to  java.util.concurrent.CompletableFuture#whenComplete  ,
> there is no throwable, only ApplicationStatus.FAILED , and data was cleaned
> up.
> 
> 
> ``
>   clusterComponent.getShutDownFuture().whenComplete(
>   (ApplicationStatus applicationStatus, Throwable throwable) -> {
>   if (throwable != null) {
>   shutDownAsync(
>   ApplicationStatus.UNKNOWN,
>   
> ExceptionUtils.stringifyException(throwable),
>   false);
>   } else {
>   // This is the general shutdown path. If a 
> separate more specific
> shutdown was
>   // already triggered, this will do nothing
>   shutDownAsync(
>   applicationStatus,
>   null,
>   true);
>   }
>   });
> }
> 
> ``
> 
> If you want to save your checkPoint,you could  refer to this document:
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html
> 
> In another way,you could change the code in 
>  org.apache.flink.runtime.dispatcher.DispatcherGateway#shutDownCluster,when
> it came to faied,save the data.
> 
> In fact, I'm wondering why it ignore the Throwable,default to delete Ha Data
> in any solution. Is there anyone could help me to solve this question?
> 
> Best,
> Husky Zeng
> 
> 
> 
> 
> 
> -
> Chinese,NanJing , Huawei.
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-05 Thread Husky Zeng


Hi Cristian,

>From  this code , we could see that the Exception or Error was ignored in
dispatcher.shutDownCluster(applicationStatus) .

``
org.apache.flink.runtime.dispatcher.DispatcherGateway#shutDownCluster

return applicationCompletionFuture
.handle((r, t) -> {
final ApplicationStatus applicationStatus;
if (t != null) {

final Optional 
cancellationException =
ExceptionUtils.findThrowable(t, 
JobCancellationException.class);

if (cancellationException.isPresent()) {
// this means the Flink Job was 
cancelled
applicationStatus = 
ApplicationStatus.CANCELED;
} else if (t instanceof CancellationException) {
// this means that the future was 
cancelled
applicationStatus = 
ApplicationStatus.UNKNOWN;
} else {
applicationStatus = 
ApplicationStatus.FAILED;
}

LOG.warn("Application {}: ", applicationStatus, 
t);
} else {
applicationStatus = ApplicationStatus.SUCCEEDED;
LOG.info("Application completed SUCCESSFULLY");
}
return dispatcher.shutDownCluster(applicationStatus);
})
.thenCompose(Function.identity());

``


So when it come to  java.util.concurrent.CompletableFuture#whenComplete  ,
there is no throwable, only ApplicationStatus.FAILED , and data was cleaned
up.


``
clusterComponent.getShutDownFuture().whenComplete(
(ApplicationStatus applicationStatus, Throwable throwable) -> {
if (throwable != null) {
shutDownAsync(
ApplicationStatus.UNKNOWN,

ExceptionUtils.stringifyException(throwable),
false);
} else {
// This is the general shutdown path. If a 
separate more specific
shutdown was
// already triggered, this will do nothing
shutDownAsync(
applicationStatus,
null,
true);
}
});
}

``

If you want to save your checkPoint,you could  refer to this document:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html

In another way,you could change the code in 
 org.apache.flink.runtime.dispatcher.DispatcherGateway#shutDownCluster,when
it came to faied,save the data.

In fact, I'm wondering why it ignore the Throwable,default to delete Ha Data
in any solution. Is there anyone could help me to solve this question?

Best,
Husky Zeng





-
Chinese,NanJing , Huawei.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-04 Thread Cristian


My suspicion is that somewhere in the path were it fails to connect yo 
zookeeper, the exception is swallowed, so instead of running the shutdown path 
for when the job fails, the general shutdown path is taken. 

This was fortunately a job for which we had a savepoint from yesterday. 
Otherwise we would have been in serios problems. 


On Fri, Sep 4, 2020, at 9:12 PM, Qingdong Zeng wrote:
> Hi Cristian,
> 
> In the log,we can see it went to the method
> shutDownAsync(applicationStatus,null,true);
>   
> ``   
> 2020-09-04 17:32:07,950 INFO 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting
> StandaloneApplicationClusterEntryPoint down with application status FAILED.
> Diagnostics null.
> ``   
> 
> In general shutdown path,default to clean up HaData is normal. So the
> problem is not why we clean up HaData in general shutdown path,but why it
> went to the general shutdown path when your cluster fails.
> 
> I am going to have lunch , and plan to  analyze the log in the afternoon.
> 
> Best,
> Qingdong Zeng
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-04 Thread Qingdong Zeng
Hi Cristian,

In the log,we can see it went to the method
shutDownAsync(applicationStatus,null,true);

``   
2020-09-04 17:32:07,950 INFO 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting
StandaloneApplicationClusterEntryPoint down with application status FAILED.
Diagnostics null.
``   

In general shutdown path,default to clean up HaData is normal. So the
problem is not why we clean up HaData in general shutdown path,but why it
went to the general shutdown path when your cluster fails.

I am going to have lunch , and plan to  analyze the log in the afternoon.

Best,
Qingdong Zeng



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

2020-07-12 Thread Si-li Liu
Someone told me that maybe this issue is Mesos specific. I'm kind of a
newbie in Flink, and I digged into the code but can not get a conclusion.
Here I just wanna have a better JoinWindow that emits the result and delete
it from the window state immediately when joined successfully, is there any
other way? Thanks!

Congxian Qiu  于2020年7月11日周六 下午3:20写道:

> Hi Si-li
>
> Thanks for the notice.
> I just want to double-check is the original problem has been solved?  As I
> found that the created issue FLINK-18464 has been closed with reason "can
> not reproduce". Am I missing something here?
>
> Best,
> Congxian
>
>
> Si-li Liu  于2020年7月10日周五 下午6:06写道:
>
>> Sorry
>>
>> I can't reproduce it with reduce/aggregate/fold/apply and due to some
>> limitations in my working environment, I can't use flink 1.10 or 1.11.
>>
>> Congxian Qiu  于2020年7月5日周日 下午6:21写道:
>>
>>> Hi
>>>
>>> First, Could you please try this problem still there if use flink 1.10
>>> or 1.11?
>>>
>>> It seems strange, from the error message, here is an error when trying
>>> to convert a non-Window state(VoidNameSpace) to a Window State (serializer
>>> is the serializer of Window state, but the state is non-Window state).
>>> Could you please try to replace the MyFuction with a 
>>> reduce/aggregate/fold/apply()
>>> function to see what happens? -- this wants to narrow down the problem.
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> Si-li Liu  于2020年7月3日周五 下午6:44写道:
>>>
 Thanks for your help

 1. I started the job from scratch, not a savepoint or externalized
 checkpoint
 2. No job graph change
 3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 4. My Flink version is 1.9.1

 Khachatryan Roman  于2020年7月3日周五 下午4:49写道:

> I still wasn't able to reproduce the issue.
>
> Can you also clarify:
> - Are you starting the job from a savepoint or externalized
> checkpoint?
> - If yes, was the job graph changed?
> - What StreamTimeCharacteristic is set, if any?
> - What exact version of Flink do you use?
>
> Regards,
> Roman
>
>
> On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu  wrote:
>
>> Hi, Thanks for your help.
>>
>> The checkpoint configuration is
>>
>> checkpoint.intervalMS=30
>> checkpoint.timeoutMS=30
>>
>> The error callstack is from JM's log, which happened in every cp.
>> Currently I don't have a success cp yet.
>>
>> Khachatryan Roman  于2020年7月3日周五
>> 上午3:50写道:
>>
>>> Hi,
>>>
>>> Thanks for the details.
>>> However, I was not able to reproduce the issue. I used parallelism
>>> levels 4, file system backend and tried different timings for
>>> checkpointing, windowing and source.
>>> Do you encounter this problem deterministically, is it always 1st
>>> checkpoint?
>>> What checkpointing interval do you use?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu  wrote:
>>>
 Hi, this is our production code so I have to modify it a little
 bit, such as variable name and function name. I think 3 classes I 
 provide
 here is enough.

 I try to join two streams, but I don't want to use the default join
 function, because I want to send the joined log immediately and remove 
 it
 from window state immediately. And my window gap time is very long( 20
 minutes), so it maybe evaluate it multiple times.

 class JoinFunction extends
   ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{

   var ueState: ValueState[RawLog] = _
   @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
   val invalidCounter = new LongCounter()
   val processCounter = new LongCounter()
   val sendToKafkaCounter = new LongCounter()

   override def open(parameters: Configuration): Unit = {
 ueState = getRuntimeContext.getState(
   new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
 )
 gZipThriftSerializer = new GZipThriftSerializer[MyType]()
 getRuntimeContext.addAccumulator("processCounter", 
 this.processCounter)
 getRuntimeContext.addAccumulator("invalidCounter", 
 this.invalidCounter)
 getRuntimeContext.addAccumulator("sendToKafkaCounter", 
 this.sendToKafkaCounter)
   }

   override def process(key: String,
ctx: Context,
logs: Iterable[RawLog],
out: Collector[OutputLog]): Unit = {
 if (ueState.value() != null) {
   processCounter.add(1L)
   val bid = ueState.value()
   val bidLog = 
 gZipThriftSerializer.decompressAndDeserialize(bid.payload, 
 classOf

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

2020-07-11 Thread Congxian Qiu
Hi Si-li

Thanks for the notice.
I just want to double-check is the original problem has been solved?  As I
found that the created issue FLINK-18464 has been closed with reason "can
not reproduce". Am I missing something here?

Best,
Congxian


Si-li Liu  于2020年7月10日周五 下午6:06写道:

> Sorry
>
> I can't reproduce it with reduce/aggregate/fold/apply and due to some
> limitations in my working environment, I can't use flink 1.10 or 1.11.
>
> Congxian Qiu  于2020年7月5日周日 下午6:21写道:
>
>> Hi
>>
>> First, Could you please try this problem still there if use flink 1.10 or
>> 1.11?
>>
>> It seems strange, from the error message, here is an error when trying to
>> convert a non-Window state(VoidNameSpace) to a Window State (serializer is
>> the serializer of Window state, but the state is non-Window state).
>> Could you please try to replace the MyFuction with a 
>> reduce/aggregate/fold/apply()
>> function to see what happens? -- this wants to narrow down the problem.
>>
>> Best,
>> Congxian
>>
>>
>> Si-li Liu  于2020年7月3日周五 下午6:44写道:
>>
>>> Thanks for your help
>>>
>>> 1. I started the job from scratch, not a savepoint or externalized
>>> checkpoint
>>> 2. No job graph change
>>> 3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>> 4. My Flink version is 1.9.1
>>>
>>> Khachatryan Roman  于2020年7月3日周五 下午4:49写道:
>>>
 I still wasn't able to reproduce the issue.

 Can you also clarify:
 - Are you starting the job from a savepoint or externalized checkpoint?
 - If yes, was the job graph changed?
 - What StreamTimeCharacteristic is set, if any?
 - What exact version of Flink do you use?

 Regards,
 Roman


 On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu  wrote:

> Hi, Thanks for your help.
>
> The checkpoint configuration is
>
> checkpoint.intervalMS=30
> checkpoint.timeoutMS=30
>
> The error callstack is from JM's log, which happened in every cp.
> Currently I don't have a success cp yet.
>
> Khachatryan Roman  于2020年7月3日周五 上午3:50写道:
>
>> Hi,
>>
>> Thanks for the details.
>> However, I was not able to reproduce the issue. I used parallelism
>> levels 4, file system backend and tried different timings for
>> checkpointing, windowing and source.
>> Do you encounter this problem deterministically, is it always 1st
>> checkpoint?
>> What checkpointing interval do you use?
>>
>> Regards,
>> Roman
>>
>>
>> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu  wrote:
>>
>>> Hi, this is our production code so I have to modify it a little bit,
>>> such as variable name and function name. I think 3 classes I provide 
>>> here
>>> is enough.
>>>
>>> I try to join two streams, but I don't want to use the default join
>>> function, because I want to send the joined log immediately and remove 
>>> it
>>> from window state immediately. And my window gap time is very long( 20
>>> minutes), so it maybe evaluate it multiple times.
>>>
>>> class JoinFunction extends
>>>   ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{
>>>
>>>   var ueState: ValueState[RawLog] = _
>>>   @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
>>>   val invalidCounter = new LongCounter()
>>>   val processCounter = new LongCounter()
>>>   val sendToKafkaCounter = new LongCounter()
>>>
>>>   override def open(parameters: Configuration): Unit = {
>>> ueState = getRuntimeContext.getState(
>>>   new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>> )
>>> gZipThriftSerializer = new GZipThriftSerializer[MyType]()
>>> getRuntimeContext.addAccumulator("processCounter", 
>>> this.processCounter)
>>> getRuntimeContext.addAccumulator("invalidCounter", 
>>> this.invalidCounter)
>>> getRuntimeContext.addAccumulator("sendToKafkaCounter", 
>>> this.sendToKafkaCounter)
>>>   }
>>>
>>>   override def process(key: String,
>>>ctx: Context,
>>>logs: Iterable[RawLog],
>>>out: Collector[OutputLog]): Unit = {
>>> if (ueState.value() != null) {
>>>   processCounter.add(1L)
>>>   val bid = ueState.value()
>>>   val bidLog = 
>>> gZipThriftSerializer.decompressAndDeserialize(bid.payload, 
>>> classOf[MyType])
>>>   logs.foreach( log => {
>>> if (log.eventType == SHOW) {
>>>   val showLog = 
>>> gZipThriftSerializer.decompressAndDeserialize(log.payload, 
>>> classOf[MyType])
>>>   sendToKafkaCounter.add(1L)
>>>   out.collect(new OutputLog(ThriftUtils.serialize(showLog), 
>>> Utils.getOutputTopic(showLog)))
>>> }
>>>   })
>>> } else {
>>>   invalidCounter.add(1L)
>>> }
>>>   }
>>> }
>

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

2020-07-10 Thread Si-li Liu
Sorry

I can't reproduce it with reduce/aggregate/fold/apply and due to some
limitations in my working environment, I can't use flink 1.10 or 1.11.

Congxian Qiu  于2020年7月5日周日 下午6:21写道:

> Hi
>
> First, Could you please try this problem still there if use flink 1.10 or
> 1.11?
>
> It seems strange, from the error message, here is an error when trying to
> convert a non-Window state(VoidNameSpace) to a Window State (serializer is
> the serializer of Window state, but the state is non-Window state).
> Could you please try to replace the MyFuction with a 
> reduce/aggregate/fold/apply()
> function to see what happens? -- this wants to narrow down the problem.
>
> Best,
> Congxian
>
>
> Si-li Liu  于2020年7月3日周五 下午6:44写道:
>
>> Thanks for your help
>>
>> 1. I started the job from scratch, not a savepoint or externalized
>> checkpoint
>> 2. No job graph change
>> 3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> 4. My Flink version is 1.9.1
>>
>> Khachatryan Roman  于2020年7月3日周五 下午4:49写道:
>>
>>> I still wasn't able to reproduce the issue.
>>>
>>> Can you also clarify:
>>> - Are you starting the job from a savepoint or externalized checkpoint?
>>> - If yes, was the job graph changed?
>>> - What StreamTimeCharacteristic is set, if any?
>>> - What exact version of Flink do you use?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu  wrote:
>>>
 Hi, Thanks for your help.

 The checkpoint configuration is

 checkpoint.intervalMS=30
 checkpoint.timeoutMS=30

 The error callstack is from JM's log, which happened in every cp.
 Currently I don't have a success cp yet.

 Khachatryan Roman  于2020年7月3日周五 上午3:50写道:

> Hi,
>
> Thanks for the details.
> However, I was not able to reproduce the issue. I used parallelism
> levels 4, file system backend and tried different timings for
> checkpointing, windowing and source.
> Do you encounter this problem deterministically, is it always 1st
> checkpoint?
> What checkpointing interval do you use?
>
> Regards,
> Roman
>
>
> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu  wrote:
>
>> Hi, this is our production code so I have to modify it a little bit,
>> such as variable name and function name. I think 3 classes I provide here
>> is enough.
>>
>> I try to join two streams, but I don't want to use the default join
>> function, because I want to send the joined log immediately and remove it
>> from window state immediately. And my window gap time is very long( 20
>> minutes), so it maybe evaluate it multiple times.
>>
>> class JoinFunction extends
>>   ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{
>>
>>   var ueState: ValueState[RawLog] = _
>>   @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
>>   val invalidCounter = new LongCounter()
>>   val processCounter = new LongCounter()
>>   val sendToKafkaCounter = new LongCounter()
>>
>>   override def open(parameters: Configuration): Unit = {
>> ueState = getRuntimeContext.getState(
>>   new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>> )
>> gZipThriftSerializer = new GZipThriftSerializer[MyType]()
>> getRuntimeContext.addAccumulator("processCounter", 
>> this.processCounter)
>> getRuntimeContext.addAccumulator("invalidCounter", 
>> this.invalidCounter)
>> getRuntimeContext.addAccumulator("sendToKafkaCounter", 
>> this.sendToKafkaCounter)
>>   }
>>
>>   override def process(key: String,
>>ctx: Context,
>>logs: Iterable[RawLog],
>>out: Collector[OutputLog]): Unit = {
>> if (ueState.value() != null) {
>>   processCounter.add(1L)
>>   val bid = ueState.value()
>>   val bidLog = 
>> gZipThriftSerializer.decompressAndDeserialize(bid.payload, 
>> classOf[MyType])
>>   logs.foreach( log => {
>> if (log.eventType == SHOW) {
>>   val showLog = 
>> gZipThriftSerializer.decompressAndDeserialize(log.payload, 
>> classOf[MyType])
>>   sendToKafkaCounter.add(1L)
>>   out.collect(new OutputLog(ThriftUtils.serialize(showLog), 
>> Utils.getOutputTopic(showLog)))
>> }
>>   })
>> } else {
>>   invalidCounter.add(1L)
>> }
>>   }
>> }
>>
>> class JoinTrigger extends Trigger[RawLog, TimeWindow] {
>>
>>   override def onElement(log: RawLog,
>>  timestamp: Long,
>>  window: TimeWindow,
>>  ctx: Trigger.TriggerContext): TriggerResult = {
>> val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>   new ValueStateDescriptor[RawLog](bidState, classOf[

Re: Checkpoint is disable, will history data in rocksdb be leak when job restart?

2020-07-05 Thread Congxian Qiu
Hi SmileSmile

As the OOM problem, maybe you can try to get a memory dump before OOM,
after you get the memory dump, you can know who consumes more memory as
expected.

Best,
Congxian


Yun Tang  于2020年7月3日周五 下午3:04写道:

> Hi
>
> If you do not enable checkpoint and have you ever restored checkpoint for
> the new job. As what I have said, the timer would also be restored and the
> event time would also be triggered so that following onEventTime() could
> also be triggered to clean history data.
>
> For the 2nd question, why your job restarts again and again? I think that
> problem should be first considered.
>
> Best
> Yun Tang
> --
> *From:* SmileSmile 
> *Sent:* Friday, July 3, 2020 14:30
> *To:* Yun Tang 
> *Cc:* 'user@flink.apache.org' 
> *Subject:* Re: Checkpoint is disable, will history data in rocksdb be
> leak when job restart?
>
> Hi,yun tang
>
> I dont open checkpoint,so when  my job restart,flink how to clean history
> state?
>
> my pod be killed only  happend after the job restart again and again, in
> this case ,I have to rebuild the flink cluster 。
>
>
>
>
> a511955993
> 邮箱:a511955...@163.com
>
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=a511955993&uid=a511955993%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22%E9%82%AE%E7%AE%B1%EF%BC%9Aa511955993%40163.com%22%5D>
>
> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail88> 定制
>
> On 07/03/2020 14:22, Yun Tang  wrote:
> Hi
>
> If your job does not need checkpoint, why you would still restore your job
> with checkpoints?
>
> Actually, I did not total understand what you want, are you afraid that
> the state restored from last checkpoint would not be cleared? Since the
> event timer is also stored in checkpoint, after you restore from
> checkpoint, the event time window would also be triggered to clean history
> state.
>
> In the end, I think you just want to know why the pod is killed after some
> time? Please consider to increase the process memory to increase the
> overhead of JVM to provide some more buffer space for native memory usage
> [1]. After Flink-1.10, RocksDB will use 100% managed memory stablely and
> once you have some extra memory, the pod might be treated as OOM to be
> killed.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#overview
>
> Best
> Yun Tang
> --
> *From:* SmileSmile 
> *Sent:* Friday, July 3, 2020 14:01
> *To:* 'user@flink.apache.org' 
> *Subject:* Checkpoint is disable, will history data in rocksdb be leak
> when job restart?
>
>
> Hi
>
> My job work on flink 1.10.1 with event time , container memory usage  will
> rise 2G after one restart,then pod will be killed by os after some times
> restart。
>
> I find history data will be cleared when  new data arrive, call the
> function onEventTime() to clearAllState.But my job no need Checkpoint ,
> when job restart, will the history data  leaf in the offheap momory and
> never be clear?
>
> This case only happend when I use rocksdb,Heap backend is ok。
>
> Can anyone help me on how to deal with this?
>
>
> a511955993
> 邮箱:a511955...@163.com
>
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=a511955993&uid=a511955993%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22%E9%82%AE%E7%AE%B1%EF%BC%9Aa511955993%40163.com%22%5D>
>
> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail88> 定制
>
>


Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

2020-07-05 Thread Congxian Qiu
Hi

First, Could you please try this problem still there if use flink 1.10 or
1.11?

It seems strange, from the error message, here is an error when trying to
convert a non-Window state(VoidNameSpace) to a Window State (serializer is
the serializer of Window state, but the state is non-Window state).
Could you please try to replace the MyFuction with a
reduce/aggregate/fold/apply()
function to see what happens? -- this wants to narrow down the problem.

Best,
Congxian


Si-li Liu  于2020年7月3日周五 下午6:44写道:

> Thanks for your help
>
> 1. I started the job from scratch, not a savepoint or externalized
> checkpoint
> 2. No job graph change
> 3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> 4. My Flink version is 1.9.1
>
> Khachatryan Roman  于2020年7月3日周五 下午4:49写道:
>
>> I still wasn't able to reproduce the issue.
>>
>> Can you also clarify:
>> - Are you starting the job from a savepoint or externalized checkpoint?
>> - If yes, was the job graph changed?
>> - What StreamTimeCharacteristic is set, if any?
>> - What exact version of Flink do you use?
>>
>> Regards,
>> Roman
>>
>>
>> On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu  wrote:
>>
>>> Hi, Thanks for your help.
>>>
>>> The checkpoint configuration is
>>>
>>> checkpoint.intervalMS=30
>>> checkpoint.timeoutMS=30
>>>
>>> The error callstack is from JM's log, which happened in every cp.
>>> Currently I don't have a success cp yet.
>>>
>>> Khachatryan Roman  于2020年7月3日周五 上午3:50写道:
>>>
 Hi,

 Thanks for the details.
 However, I was not able to reproduce the issue. I used parallelism
 levels 4, file system backend and tried different timings for
 checkpointing, windowing and source.
 Do you encounter this problem deterministically, is it always 1st
 checkpoint?
 What checkpointing interval do you use?

 Regards,
 Roman


 On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu  wrote:

> Hi, this is our production code so I have to modify it a little bit,
> such as variable name and function name. I think 3 classes I provide here
> is enough.
>
> I try to join two streams, but I don't want to use the default join
> function, because I want to send the joined log immediately and remove it
> from window state immediately. And my window gap time is very long( 20
> minutes), so it maybe evaluate it multiple times.
>
> class JoinFunction extends
>   ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{
>
>   var ueState: ValueState[RawLog] = _
>   @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
>   val invalidCounter = new LongCounter()
>   val processCounter = new LongCounter()
>   val sendToKafkaCounter = new LongCounter()
>
>   override def open(parameters: Configuration): Unit = {
> ueState = getRuntimeContext.getState(
>   new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
> )
> gZipThriftSerializer = new GZipThriftSerializer[MyType]()
> getRuntimeContext.addAccumulator("processCounter", 
> this.processCounter)
> getRuntimeContext.addAccumulator("invalidCounter", 
> this.invalidCounter)
> getRuntimeContext.addAccumulator("sendToKafkaCounter", 
> this.sendToKafkaCounter)
>   }
>
>   override def process(key: String,
>ctx: Context,
>logs: Iterable[RawLog],
>out: Collector[OutputLog]): Unit = {
> if (ueState.value() != null) {
>   processCounter.add(1L)
>   val bid = ueState.value()
>   val bidLog = 
> gZipThriftSerializer.decompressAndDeserialize(bid.payload, 
> classOf[MyType])
>   logs.foreach( log => {
> if (log.eventType == SHOW) {
>   val showLog = 
> gZipThriftSerializer.decompressAndDeserialize(log.payload, 
> classOf[MyType])
>   sendToKafkaCounter.add(1L)
>   out.collect(new OutputLog(ThriftUtils.serialize(showLog), 
> Utils.getOutputTopic(showLog)))
> }
>   })
> } else {
>   invalidCounter.add(1L)
> }
>   }
> }
>
> class JoinTrigger extends Trigger[RawLog, TimeWindow] {
>
>   override def onElement(log: RawLog,
>  timestamp: Long,
>  window: TimeWindow,
>  ctx: Trigger.TriggerContext): TriggerResult = {
> val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>   new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
> )
> val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>   new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>
> if (!firstSeen.value()) {
>   ctx.registerEventTimeTimer(window.getEnd)
>   firstSeen.update(true)
> }
> val eventType

  1   2   3   >