Re: checkpoint upload thread

2024-08-01 Thread Yanfei Lei
Hi Enric,

Sorry for the confusion, I mean "It can be done theoretically, and it
depends on the specific implementation of the file system client in
fact."

I think there are two ways to let different tasks share a connection
(In other words: "socket"):
1. Share one *Output Stream*;
2. Use different Output Streams, but those OutputStreams share one
underlying socket(which is implemented by the specific file system
client).

Back to Flink, currently different tasks do not share the output
streams, but I am not sure whether the underlying sockets are shared.


Enric Ott <243816...@qq.com> 于2024年8月1日周四 10:04写道:

>
> Hi,Yanfei:
>   What do you mean by using the word possible in statment it is possible to 
> use the same
> connection for an operator chain? Meaning able to be done but not applied in 
> fact? Or actually applied but with applied probability?
>
>   Thanks.
>
>
> -- 原始邮件 --
> 发件人: "Yanfei Lei" ;
> 发送时间: 2024年7月30日(星期二) 下午5:15
> 收件人: "Enric Ott"<243816...@qq.com>;
> 抄送: "user";
> 主题: Re: checkpoint upload thread
>
> Hi Enric,
>
> If I understand correctly, one subtask would use one
> `asyncOperationsThreadPool`[1,2], it is possible to use the same
> connection for an operator chain.
>
> [1] 
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L443
> [2] 
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java#L716
>
> Enric Ott <243816...@qq.com> 于2024年7月30日周二 11:11写道:
> >
> > Hi,Community:
> >   Does Flink upload states and inflight buffers within the same 
> > opratorchain using the same connection (instead of per connection per 
> > operator)?
>
>
>
> --
> Best,
> Yanfei



--
Best,
Yanfei


Re: checkpoint upload thread

2024-07-30 Thread Yanfei Lei
Hi Enric,

If I understand correctly, one subtask would use one
`asyncOperationsThreadPool`[1,2], it is possible to use the same
connection for an operator chain.

[1] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L443
[2] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java#L716

Enric Ott <243816...@qq.com> 于2024年7月30日周二 11:11写道:
>
> Hi,Community:
>   Does Flink upload states and inflight buffers within the same opratorchain 
> using the same connection (instead of per connection per operator)?



-- 
Best,
Yanfei


Re: State leak in tumbling windows

2024-06-06 Thread Yanfei Lei
Hi Adam,

Is your job a datastream job or a sql job? After I looked through the
window-related code(I'm not particularly familiar with this part of
the code), this problem should only exist in datastream.

Adam Domanski  于2024年6月3日周一 16:54写道:
>
> Dear Flink users,
>
> I spotted the ever growing checkpoint size in my Flink application which uses 
> tumble windows.
>
> I found such a ticket: https://issues.apache.org/jira/browse/FLINK-33192, but 
> no comments.
>
> Can somebody confirm the issue?
>
> BR,
> Adam.
>
>


-- 
Best,
Yanfei


Re: TTL issue with large RocksDB keyed state

2024-06-03 Thread Yanfei Lei
Hi,

> 1. After multiple full checkpoints and a NATIVE savepoint the size was 
> unchanged. I'm wondering if RocksDb compaction is  because we never update 
> key values? The state is nearly fully composed of keys' space. Do keys not 
> get freed using RocksDb compaction filter for TTL?

Regarding TTL-related questions, has your job been running for 30
days? TTL is checked based on the last time a key was created or
updated.

Regarding “ I'm wondering if RocksDb compaction is  because we never
update key values”: Periodic compaction could speed up expired state
entries cleanup, especially for state entries rarely accessed[1],
maybe you can try to set it. BTW, are there any deletion operations in
your job?

> 2. That should work but will doing that "reset the clock" for the TTL?

No, TTL is stored as part of KV and therefore will not be reset. I
think you can try “TTL Periodic compaction”[1] first.

3. Yes, restoring from a canonical savepoint can bypass FLINK-34050,
and a canonical savepoint should be generated first.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/#cleanup-of-expired-state

Cliff Resnick  于2024年6月3日周一 02:44写道:
>
> Hi everyone,
>
>
> We have a Flink application that has a very large and perhaps unusual state. 
> The basic shape of it is a very large and somewhat random keyed-stream 
> partition space, each with a continuously growing map-state keyed by 
> microsecond time Long values. There are never any overwrites in the map state 
> which is monotonic per partition key.  Map state was chosen over list state 
> in the hope that we can manage a sliding window using TTL. Using RocksDB 
> incremental checkpointing, the app runs very well despite the large total 
> checkpoint size. Our current checkpoint size is 3.2TB.
>
>
> We have multiple questions around space amplification problems when using the 
> RocksDB backend and I'm wondering if anyone can suggest or confirm answers.
>
>
>
> 1. Using LEVEL compaction we have not seen any decrease in total checkpoint 
> size through TTL compaction. To test the TTL, I cut the period from 60 to 30 
> days (we have well over 60 days processing time), enabled 
> cleanupFullSnapshot() and ran a test job without incremental checkpointing 
> enabled. After multiple full checkpoints and a NATIVE savepoint the size was 
> unchanged. I'm wondering if RocksDb compaction is  because we never update 
> key values? The state is nearly fully composed of keys' space. Do keys not 
> get freed using RocksDb compaction filter for TTL?
>
> 2. I'm wondering if FIFO compaction is a solution for above. To move to that 
> that we will need to first take a canonical savepoint then redeploy with 
> RocksDB/FIFO. That should work but will doing that "reset the clock" for the 
> TTL? Given it's nature though, I am leaning to this as our only option.
>
>
> 3. Rescaling is a problem because of this issue: 
> https://issues.apache.org/jira/browse/FLINK-34050. The fix for this is not 
> yet released. Because of this bug  the checkpoint size scales somewhat larger 
> than is proportionate to the job rescaling. For example if we go from 44 
> slots to 60, the checkpoint will scale from 3.2 TB to 4.9 TB. Before 1.19.1 
> is released can cherry-pick the fix and create our own Docker image, or will 
> restoring from a canonical savepoint as described above sidestep this bug?
>
>
> If anyone can help with any insights, please do!
>
>



-- 
Best,
Yanfei


Re: Flink 1.18: Unable to resume from a savepoint with error InvalidPidMappingException

2024-04-23 Thread Yanfei Lei
Hi JM,

> why having "transactional.id.expiration.ms" < "transaction.timeout.ms" helps

When recover a job from a checkpoint/savepoint which contains Kafka
transactions, Flink will try to re-commit those transactions based on
transaction ID upon recovery.
If those transactions timeout or transaction ID expire, re-commit may
fail due to the mismatch of transactional id.
IIUC, if we set  "transactional.id.expiration.ms" <
"transaction.timeout.ms", it allows transactional id to be reset, but
will cause data loss.

[1][2] would be helpful to understand what happened.

[1] 
https://issues.apache.org/jira/browse/FLINK-16419?focusedCommentId=17624315&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17624315
[2] 
https://ververica.zendesk.com/hc/en-us/articles/360013269680-Best-Practices-for-Using-Kafka-Sources-Sinks-in-Flink-Jobs

Jean-Marc Paulin  于2024年4月23日周二 16:45写道:
>
> Thanks for y our insight.
>
> I am still trying to understand exactly what happens here. We currently have 
> the default setting in kafka, and we set the "transaction.timeout.ms" to 15 
> minutes (which also happen to be the default "transaction.max.timeout.ms".  
> My expectation would be that if our savepoint is more than 15 minutes old it 
> would fail, but that is not the case.
>
> I still think we need to extend the "transaction.max.timeout.ms" to something 
> like 7 days, as a 7 days old savepoints is effectively worthless, and 
> probably adjust  "transaction.timeout.ms" to be close to this.
>
> But can you explain how "transactional.id.expiration.ms" influences the 
> InvalidPidMappingException, or why having "transactional.id.expiration.ms" < 
> "transaction.timeout.ms" helps?
>
> Kind regards
>
> Jean-Marc
>
>
> 
> From: Yanfei Lei 
> Sent: Monday, April 22, 2024 03:28
> To: Jean-Marc Paulin 
> Cc: user@flink.apache.org 
> Subject: [EXTERNAL] Re: Flink 1.18: Unable to resume from a savepoint with 
> error InvalidPidMappingException
>
> Hi JM,
>
> Yes, `InvalidPidMappingException` occurs because the transaction is
> lost in most cases.
>
> For short-term, " transaction.timeout.ms" >
> "transactional.id.expiration.ms" can ignore the
> `InvalidPidMappingException`[1].
> For long-term, FLIP-319[2] provides a solution.
>
> [1] 
> https://speakerdeck.com/rmetzger/3-flink-mistakes-we-made-so-you-wont-have-to?slide=13
> [2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
>
> Jean-Marc Paulin  于2024年4月20日周六 02:30写道:
> >
> > Hi,
> >
> > we use Flink 1.18 with Kafka Sink, and we enabled `EXACTLY_ONCE` on one of 
> > our kafka sink. We set the transation timeout to 15 minutes. When we try to 
> > restore from a savepoint, way after that 15 minutes window, Flink enter in 
> > a RESTARTING loop. We see the error:
> >
> > ```
> > {
> >   "exception": {
> > "exception_class": 
> > "org.apache.kafka.common.errors.InvalidPidMappingException",
> > "exception_message": "The producer attempted to use a producer id which 
> > is not currently assigned to its transactional id.",
> > "stacktrace": 
> > "org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> > attempted to use a producer id which is not currently assigned to its 
> > transactional id.\n"
> >   },
> >   "@version": 1,
> >   "source_host": "aiops-ir-lifecycle-eventprocessor-ep-jobmanager-0",
> >   "message": "policy-exec::schedule-policy-execution -> 
> > (policy-exec::select-kafka-async-policy-stages, 
> > policy-exec::select-async-policy-stages -> 
> > policy-exec::execute-async-policy-stages, 
> > policy-exec::select-non-async-policy-stages, Sink: stories-input, Sink: 
> > policy-completion-results, Sink: stories-changes, Sink: alerts-input, Sink: 
> > story-notifications-output, Sink: alerts-output, Sink: alerts-changes, 
> > Sink: connector-alerts, Sink: updated-events-output, Sink: stories-output, 
> > Sink: runbook-execution-requests) (6/6) 
> > (3f8cb042c1aa628891c66a8b52d1_593c33b9decafa4ad6ae85c185860bef_5_0) 
> > switched from INITIALIZING to FAILED on 
> > aiops-ir-lifecycle-eventprocessor-ep-taskmanager-1.aiops-ir-lifecycle-eventprocessor-ep-taskmanager.cp4aiops.svc:6122-d2828c
> >  @ 
> > aiops-ir-lifecycle-eventprocessor-ep-taskmanager-1.aiops-ir-lifecycle-eventprocessor-ep-taskmanager.cp4aiops.svc.cluster.local
> >  (dataPort=6121).&q

Re: Flink 1.18: Unable to resume from a savepoint with error InvalidPidMappingException

2024-04-21 Thread Yanfei Lei
Hi JM,

Yes, `InvalidPidMappingException` occurs because the transaction is
lost in most cases.

For short-term, " transaction.timeout.ms" >
"transactional.id.expiration.ms" can ignore the
`InvalidPidMappingException`[1].
For long-term, FLIP-319[2] provides a solution.

[1] 
https://speakerdeck.com/rmetzger/3-flink-mistakes-we-made-so-you-wont-have-to?slide=13
[2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710

Jean-Marc Paulin  于2024年4月20日周六 02:30写道:
>
> Hi,
>
> we use Flink 1.18 with Kafka Sink, and we enabled `EXACTLY_ONCE` on one of 
> our kafka sink. We set the transation timeout to 15 minutes. When we try to 
> restore from a savepoint, way after that 15 minutes window, Flink enter in a 
> RESTARTING loop. We see the error:
>
> ```
> {
>   "exception": {
> "exception_class": 
> "org.apache.kafka.common.errors.InvalidPidMappingException",
> "exception_message": "The producer attempted to use a producer id which 
> is not currently assigned to its transactional id.",
> "stacktrace": "org.apache.kafka.common.errors.InvalidPidMappingException: 
> The producer attempted to use a producer id which is not currently assigned 
> to its transactional id.\n"
>   },
>   "@version": 1,
>   "source_host": "aiops-ir-lifecycle-eventprocessor-ep-jobmanager-0",
>   "message": "policy-exec::schedule-policy-execution -> 
> (policy-exec::select-kafka-async-policy-stages, 
> policy-exec::select-async-policy-stages -> 
> policy-exec::execute-async-policy-stages, 
> policy-exec::select-non-async-policy-stages, Sink: stories-input, Sink: 
> policy-completion-results, Sink: stories-changes, Sink: alerts-input, Sink: 
> story-notifications-output, Sink: alerts-output, Sink: alerts-changes, Sink: 
> connector-alerts, Sink: updated-events-output, Sink: stories-output, Sink: 
> runbook-execution-requests) (6/6) 
> (3f8cb042c1aa628891c66a8b52d1_593c33b9decafa4ad6ae85c185860bef_5_0) 
> switched from INITIALIZING to FAILED on 
> aiops-ir-lifecycle-eventprocessor-ep-taskmanager-1.aiops-ir-lifecycle-eventprocessor-ep-taskmanager.cp4aiops.svc:6122-d2828c
>  @ 
> aiops-ir-lifecycle-eventprocessor-ep-taskmanager-1.aiops-ir-lifecycle-eventprocessor-ep-taskmanager.cp4aiops.svc.cluster.local
>  (dataPort=6121).",
>   "thread_name": "flink-pekko.actor.default-dispatcher-18",
>   "@timestamp": "2024-04-19T11:11:05.169+",
>   "level": "INFO",
>   "logger_name": "org.apache.flink.runtime.executiongraph.ExecutionGraph"
> }
> ```
> As much as I understanding the transaction is lost, would it be possible to 
> ignore this particular error and resume the job anyway?
>
> Thanks for any suggestions
>
> JM
>
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU



-- 
Best,
Yanfei


Re: [ANNOUNCE] Apache Paimon is graduated to Top Level Project

2024-03-28 Thread Yanfei Lei
Congratulations!

Best,
Yanfei

Zhanghao Chen  于2024年3月28日周四 19:59写道:
>
> Congratulations!
>
> Best,
> Zhanghao Chen
> 
> From: Yu Li 
> Sent: Thursday, March 28, 2024 15:55
> To: d...@paimon.apache.org 
> Cc: dev ; user 
> Subject: Re: [ANNOUNCE] Apache Paimon is graduated to Top Level Project
>
> CC the Flink user and dev mailing list.
>
> Paimon originated within the Flink community, initially known as Flink
> Table Store, and all our incubating mentors are members of the Flink
> Project Management Committee. I am confident that the bonds of
> enduring friendship and close collaboration will continue to unite the
> two communities.
>
> And congratulations all!
>
> Best Regards,
> Yu
>
> On Wed, 27 Mar 2024 at 20:35, Guojun Li  wrote:
> >
> > Congratulations!
> >
> > Best,
> > Guojun
> >
> > On Wed, Mar 27, 2024 at 5:24 PM wulin  wrote:
> >
> > > Congratulations~
> > >
> > > > 2024年3月27日 15:54,王刚  写道:
> > > >
> > > > Congratulations~
> > > >
> > > >> 2024年3月26日 10:25,Jingsong Li  写道:
> > > >>
> > > >> Hi Paimon community,
> > > >>
> > > >> I’m glad to announce that the ASF board has approved a resolution to
> > > >> graduate Paimon into a full Top Level Project. Thanks to everyone for
> > > >> your help to get to this point.
> > > >>
> > > >> I just created an issue to track the things we need to modify [2],
> > > >> please comment on it if you feel that something is missing. You can
> > > >> refer to apache documentation [1] too.
> > > >>
> > > >> And, we already completed the GitHub repo migration [3], please update
> > > >> your local git repo to track the new repo [4].
> > > >>
> > > >> You can run the following command to complete the remote repo tracking
> > > >> migration.
> > > >>
> > > >> git remote set-url origin https://github.com/apache/paimon.git
> > > >>
> > > >> If you have a different name, please change the 'origin' to your remote
> > > name.
> > > >>
> > > >> Please join me in celebrating!
> > > >>
> > > >> [1]
> > > https://incubator.apache.org/guides/transferring.html#life_after_graduation
> > > >> [2] https://github.com/apache/paimon/issues/3091
> > > >> [3] https://issues.apache.org/jira/browse/INFRA-25630
> > > >> [4] https://github.com/apache/paimon
> > > >>
> > > >> Best,
> > > >> Jingsong Lee
> > >
> > >


Re: Flink job unable to restore from savepoint

2024-03-27 Thread Yanfei Lei
Hi Prashant,

Compared to the job that generated savepoint, are there any changes in
the new job? For example, data fields were added or deleted, or the
type serializer was changed?
More detailed job manager logs may help.

prashant parbhane  于2024年3月27日周三 14:20写道:
>
> Hello,
>
> We have been facing this weird issue of not being able to restore from 
> savepoint, when we have a significant load on flink jobs.
>
> "org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
> user key.
>
> at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:495)
>
> Caused by: java.io.EOFException
>
> at 
> org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)
>
> at org.apache.flink.types.StringValue.readString(StringValue.java:781)
>
> at 
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)
>
> at 
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)
>
> at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:389)
>
> at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.access$000(RocksDBMapState.java:65)
>
> at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:492"
>
>
> Any suggestions?
>
>
> Thanks,
> Prashant
>
>
>
>


-- 
Best,
Yanfei


Re: There is no savepoint operation with triggerId

2024-03-25 Thread Yanfei Lei
Hi Lars,

It looks like the relevant logs when retrieving savepoint.
Have you frequently retrieved savepoints through the REST interface?

Lars Skjærven  于2024年3月26日周二 07:17写道:
>
> Hello,
> My job manager is constantly complaining with the following error:
>
> "Exception occurred in REST handler: There is no savepoint operation with 
> triggerId= for job bc0cb60b710e64e23195aa1610ad790a".
>
> "logger_name":"org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler"
>
> Checkpointing and savepointing seems to work as expected, but my logs are 
> flooded of these errors. Any tips ?
>
> L



-- 
Best,
Yanfei


Re: Is there any options to control the file names in file sink

2024-03-20 Thread Yanfei Lei
Hi Lasse,
If the datastream job is used, you can try setting `OutputFileConfig`
for file sink, something like[1]:
```
OutputFileConfig config = OutputFileConfig
 .builder()
 .withPartPrefix("prefix")
 .withPartSuffix(".ext")
 .build();

FileSink> sink = FileSink
 .forRowFormat((new Path(outputPath), new SimpleStringEncoder<>("UTF-8"))
 .withBucketAssigner(new KeyBucketAssigner())
 .withRollingPolicy(OnCheckpointRollingPolicy.build())
 .withOutputFileConfig(config)
 .build();

```

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/filesystem/#bucket-assignment

Lasse Nedergaard  于2024年3月20日周三 20:50写道:
>
> Hi.
>
> Anyone know if it’s possible to control the file names eg change the uuid 
> file names and extensions to something else. I know I can control the path 
> but I would like to be able to set the file name and the extension based on 
> the data in the stream. So I can’t use any general pre or postfix that are 
> applied to all files.
>
> All ideas appreciate
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>


-- 
Best,
Yanfei


Re: [ANNOUNCE] Apache Flink 1.19.0 released

2024-03-18 Thread Yanfei Lei
Congrats, thanks for the great work!

Sergey Nuyanzin  于2024年3月18日周一 19:30写道:
>
> Congratulations, thanks release managers and everyone involved for the great 
> work!
>
> On Mon, Mar 18, 2024 at 12:15 PM Benchao Li  wrote:
>>
>> Congratulations! And thanks to all release managers and everyone
>> involved in this release!
>>
>> Yubin Li  于2024年3月18日周一 18:11写道:
>> >
>> > Congratulations!
>> >
>> > Thanks to release managers and everyone involved.
>> >
>> > On Mon, Mar 18, 2024 at 5:55 PM Hangxiang Yu  wrote:
>> > >
>> > > Congratulations!
>> > > Thanks release managers and all involved!
>> > >
>> > > On Mon, Mar 18, 2024 at 5:23 PM Hang Ruan  wrote:
>> > >
>> > > > Congratulations!
>> > > >
>> > > > Best,
>> > > > Hang
>> > > >
>> > > > Paul Lam  于2024年3月18日周一 17:18写道:
>> > > >
>> > > > > Congrats! Thanks to everyone involved!
>> > > > >
>> > > > > Best,
>> > > > > Paul Lam
>> > > > >
>> > > > > > 2024年3月18日 16:37,Samrat Deb  写道:
>> > > > > >
>> > > > > > Congratulations !
>> > > > > >
>> > > > > > On Mon, 18 Mar 2024 at 2:07 PM, Jingsong Li 
>> > > > > > 
>> > > > > wrote:
>> > > > > >
>> > > > > >> Congratulations!
>> > > > > >>
>> > > > > >> On Mon, Mar 18, 2024 at 4:30 PM Rui Fan <1996fan...@gmail.com> 
>> > > > > >> wrote:
>> > > > > >>>
>> > > > > >>> Congratulations, thanks for the great work!
>> > > > > >>>
>> > > > > >>> Best,
>> > > > > >>> Rui
>> > > > > >>>
>> > > > > >>> On Mon, Mar 18, 2024 at 4:26 PM Lincoln Lee 
>> > > > > >>> 
>> > > > > >> wrote:
>> > > > > 
>> > > > >  The Apache Flink community is very happy to announce the 
>> > > > >  release of
>> > > > > >> Apache Flink 1.19.0, which is the fisrt release for the Apache 
>> > > > > >> Flink
>> > > > > 1.19
>> > > > > >> series.
>> > > > > 
>> > > > >  Apache Flink® is an open-source stream processing framework for
>> > > > > >> distributed, high-performing, always-available, and accurate data
>> > > > > streaming
>> > > > > >> applications.
>> > > > > 
>> > > > >  The release is available for download at:
>> > > > >  https://flink.apache.org/downloads.html
>> > > > > 
>> > > > >  Please check out the release blog post for an overview of the
>> > > > > >> improvements for this bugfix release:
>> > > > > 
>> > > > > >>
>> > > > >
>> > > > https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/
>> > > > > 
>> > > > >  The full release notes are available in Jira:
>> > > > > 
>> > > > > >>
>> > > > >
>> > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12353282
>> > > > > 
>> > > > >  We would like to thank all contributors of the Apache Flink
>> > > > community
>> > > > > >> who made this release possible!
>> > > > > 
>> > > > > 
>> > > > >  Best,
>> > > > >  Yun, Jing, Martijn and Lincoln
>> > > > > >>
>> > > > >
>> > > > >
>> > > >
>> > >
>> > >
>> > > --
>> > > Best,
>> > > Hangxiang.
>>
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>
>
>
> --
> Best regards,
> Sergey



-- 
Best,
Yanfei


Re: Flink Checkpoint & Offset Commit

2024-03-07 Thread Yanfei Lei
Hi Jacob,

> I have multiple upstream sources to connect to depending on the business 
> model which are not Kafka. Based on criticality of the system and publisher 
> dependencies, we cannot switch to Kafka for these.

Sounds like you want to implement some custom connectors, [1][2] may
be helpful to implement a custom Flink’s Table API connector.

Specifically in terms of “Flink Checkpoint & Offset Commit”, the
custom source needs to inherit the `SourceReader` interfaces, and you
can override `snapshotState()` and `notifyCheckpointComplete()` into
your implementations.
[3] is the related code of kafka connector under datastream API, [4]
is the related code of kafka connector under TABLE API & SQL.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/
[2] 
https://flink.apache.org/2021/09/07/implementing-a-custom-source-connector-for-table-api-and-sql-part-one/
[3] 
https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L98-L177
[4] 
https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java#L354

xia rui  于2024年3月8日周五 10:12写道:
>
> Hi Jacob.
>
> Flink uses "notification" to let an operator callback the completion of a 
> checkpoint. After gathering all checkpoint done messages from TMs, JM sends a 
> "notify checkpoint completed" RPC to all TMs. Operators will handle this 
> notification, where checkpoint success callbacks are invoked. For example, 
> Kafka sources commit the current consuming offset. I think this doc 
> (https://flink.apache.org/2018/02/28/an-overview-of-end-to-end-exactly-once-processing-in-apache-flink-with-apache-kafka-too/)
>  may be helpful.
>
> You can override the `notifyCheckpointComlete()` to customize the behavior of 
> handling checkpoint completion.
>
> Best regards Rui Xia
>
> On Fri, Mar 8, 2024 at 3:03 AM Jacob Rollings  
> wrote:
>>
>>
>> Hello,
>>
>> I am implementing proof of concepts based Flink realtime streaming solutions.
>>
>> I came across below lines in out-of-the-box Flink Kafka connector documents.
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/
>> Consumer Offset Committing #
>>
>> Kafka source commits the current consuming offset when checkpoints are 
>> completed, for ensuring the consistency between Flink’s checkpoint state and 
>> committed offsets on Kafka brokers.
>>
>>
>> How is Flink able to control the callbacks from checkpointing? Is there a 
>> way to override this into my implementations. I have multiple upstream 
>> sources to connect to depending on the business model which are not Kafka. 
>> Based on criticality of the system and publisher dependencies, we cannot 
>> switch to Kafka for these. So I was hoping to do the same which kafka 
>> connector is doing.
>>
>>
>> Cheers,
>>
>> JR



-- 
Best,
Yanfei


Re: SecurityManager in Flink

2024-03-06 Thread Yanfei Lei
Hi Kirti Dhar,
What is your java version? I guess this problem may be related to
FLINK-33309[1]. Maybe you can try adding "-Djava.security.manager" to
the java options.

[1] https://issues.apache.org/jira/browse/FLINK-33309

Kirti Dhar Upadhyay K via user  于2024年3月6日周三 18:10写道:
>
> Hi Team,
>
>
>
> I am using Flink File Source with Local File System.
>
> I am facing an issue, if source directory does not has read permission, it is 
> returning the list of files as null instead of throwing permission exception 
> (refer the highlighted line below), resulting in NPE.
>
>
>
> final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());
> for (FileStatus containedStatus : containedFiles) {
> addSplitsForPath(containedStatus, fs, target);
> }
>
> Debugging the issue found that, SecurityManager is coming as null while 
> listing the files, hence skipping the permissions on directory.
>
> What is the way to set SecurityManager in Flink?
>
>
>
> Regards,
>
> Kirti Dhar
>
>



-- 
Best,
Yanfei


Re: What to do about local disks with RocksDB with Kubernetes Operator

2023-10-18 Thread Yanfei Lei
Hi Alex,

AFAIK, the emptyDir[1] can be used directly as local disks, and
emptyDir can be defined by referring to this pod template[2].

If you want to use local disks through PV, you can first create a
statefulSet and mount the PV through volume claim templates[3], the
example “Local Recovery Enabled TaskManager StatefulSet”[4] provided
in the docs may be useful.

[1] https://kubernetes.io/docs/concepts/storage/volumes/#emptydir
[2] 
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template
[3] 
https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/#volume-claim-templates
[4] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#local-recovery-enabled-taskmanager-statefulset

Best,
Yanfei

Alex Craig  于2023年10月19日周四 02:57写道:
>
> The recommended practice for RocksDB usage is to have local disks accessible 
> to it. The Kubernetes Operator doesn’t have fields related to creating disks 
> for RocksDB to use.
>
>
>
> For instance, say I have maxParallelism=10 but parallelism=1. I have a 
> statically created PVC named “flink-rocksdb”. The first TaskManager spins up 
> and mounts this PVC. But successive ones fail to start because there is no 
> PVC for them to mount.
>
>
>
> Has anybody solved this? Seems like a big issue with using Flink in 
> Kubernetes…


Re: Failure to restore from last completed checkpoint

2023-09-07 Thread Yanfei Lei
Hey Jacqlyn,
According to the stack trace, it seems that there is a problem when
the checkpoint is triggered. Is this the problem after the restore?
would you like to share some logs related to restoring?

Best,
Yanfei

Jacqlyn Bender via user  于2023年9月8日周五 05:11写道:
>
> Hey folks,
>
>
> We experienced a pipeline failure where our job manager restarted and we were 
> for some reason unable to restore from our last successful checkpoint. We had 
> regularly completed checkpoints every 10 minutes up to this failure and 0 
> failed checkpoints logged. Using Flink version 1.17.1.
>
>
> Wondering if anyone can shed light on what might have happened?
>
>
> Here's the error from our logs:
>
>
> Message: FATAL: Thread ‘Checkpoint Timer’ produced an uncaught exception. 
> Stopping the process...
>
>
> extendedStackTrace: java.util.concurrent.CompletionException: 
> java.util.concurrent.CompletionException: java.lang.NullPointerException
>
> at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$8(CheckpointCoordinator.java:669)
>  ~[a-pipeline-name.jar:1.0]
>
> at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
>  ~[?:?]
>
> at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
>  ~[?:?]
>
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  [?:?]
>
> at 
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) 
> [?:?]
>
> at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:910)
>  [?:?]
>
> at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>  [?:?]
>
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 
> [?:?]
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
>
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>  [?:?]
>
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  [?:?]
>
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  [?:?]
>
> at java.lang.Thread.run(Thread.java:829) [?:?]
>
> Caused by: java.util.concurrent.CompletionException: 
> java.lang.NullPointerException
>
> at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
>  ~[?:?]
>
> at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
>  ~[?:?]
>
> at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:932) 
> ~[?:?]
>
> at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
>  ~[?:?]
>
> ... 7 more
>
> Caused by: java.lang.NullPointerException
>
> at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.abortCurrentTriggering(OperatorCoordinatorHolder.java:399)
>  ~[a-pipeline-name.jar:1.0]
>
> at java.util.ArrayList.forEach(ArrayList.java:1541) ~[?:?]
>
> at 
> java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085) 
> ~[?:?]
>
> at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:947)
>  ~[a-pipeline-name.jar:1.0]
>
> at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:923)
>  ~[a-pipeline-name.jar:1.0]
>
> at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$7(CheckpointCoordinator.java:655)
>  ~[a-pipeline-name.jar:1.0]
>
> at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) 
> ~[?:?]
>
> at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
>  ~[?:?]
>
> ... 7 more
>
>


Re: Query around Rocksdb

2023-07-05 Thread Yanfei Lei
Hi neha,

1. You can set the path of  jemalloc into LD_LIBRARY_PATH of YARN[1],
and here is a blog post about "RocksDB Memory Usage"[2].
2. The default value of cleanupInRocksdbCompactFilter is 1000[3],
maybe another value can be set according to the TPS of the job. The
value of `state.backend.rocksdb.metrics.num-running-compactions`[4] may
be affected by the sampling frequency of metrics,  is the value of
`state.backend.rocksdb.metrics.compaction-read-bytes` also zero?

[1] 
https://data-flair.training/forums/topic/how-to-include-native-libraries-in-yarn-jobs/
[2] https://shopify.engineering/optimizing-apache-flink-applications-tips
[3] 
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/state/#cleanup-during-rocksdb-compaction
[4] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#rocksdb-native-metrics
neha goyal  于2023年7月4日周二 17:39写道:

>
> Hello Yanfei and Shammon,
>
> I have two additional questions. The links mentioned in the reply talk about 
> using jemalloc in a Docker image, but I am using Yarn on AWS EMR. How can I 
> use jemalloc with Yarn? Any references you can provide would be greatly 
> appreciated.
>
> StateTtlConfig cityTtlConfig = StateTtlConfig
> 
> .newBuilder(org.apache.flink.api.common.time.Time.hours(longerLookbackHours))
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> .build();
> MapStateDescriptor cityListDescritpor = new 
> MapStateDescriptor("cityList", String.class, Integer.class);
> cityListDescritpor.enableTimeToLive(cityTtlConfig);
>
>
> Secondly, I have applied TTL to my state, and I rely on RocksDB's automated 
> compaction process to clear expired events. However, according to the RocksDB 
> metrics provided by Flink, the compaction process is not occurring[attached 
> the metrics screenshot], and there is a constant increase in the savepoint 
> size. Do you suggest adding cleanupInRocksdbCompactFilter(1000) as well? What 
> will be the impact of this configuration?
>
> On Tue, Jul 4, 2023 at 8:13 AM Yanfei Lei  wrote:
>>
>> Hi neha,
>>
>> Due to the limitation of RocksDB, we cannot create a
>> strict-capacity-limit LRUCache which shared among rocksDB instance(s),
>> FLINK-15532[1] is created to track this.
>> BTW, have you set TTL for this job[2],  TTL can help control the state size.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-15532
>> [2]https://issues.apache.org/jira/browse/FLINK-31089
>>
>> Shammon FY  于2023年7月4日周二 09:08写道:
>> >
>> > Hi neha,
>> >
>> > Which flink version are you using? We have also encountered the issue of 
>> > continuous growth of off-heap memory in the TM of the session cluster 
>> > before, the reason is that the memory fragments cannot be reused like 
>> > issue [1]. You can check the memory allocator and try to use jemalloc 
>> > instead refer to doc [2] and [3].
>> >
>> > [1] https://issues.apache.org/jira/browse/FLINK-19125
>> > [2] 
>> > https://nightlies.apache.org/flink/flink-docs-release-1.15/release-notes/flink-1.12/#deployment
>> > [3] 
>> > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#switching-the-memory-allocator
>> >
>> > Best,
>> > Shammon FY
>> >
>> > On Sat, Jul 1, 2023 at 2:58 PM neha goyal  wrote:
>> >>
>> >> Hello,
>> >>
>> >> I am trying to debug the unbounded memory consumption by the Flink 
>> >> process. The heap size of the process remains the same. The size of the 
>> >> RSS of the process keeps on increasing. I suspect it might be because of 
>> >> RocksDB.
>> >>
>> >> we have the default value for state.backend.rocksdb.memory.managed as 
>> >> true. Can anyone confirm that this config will Rockdb be able to take the 
>> >> unbounded native memory?
>> >>
>> >> If yes, what metrics can I check to confirm the issue? Any help would be 
>> >> appreciated.
>>
>>
>>
>> --
>> Best,
>> Yanfei



--
Best,
Yanfei


Re: Query around Rocksdb

2023-07-03 Thread Yanfei Lei
Hi neha,

Due to the limitation of RocksDB, we cannot create a
strict-capacity-limit LRUCache which shared among rocksDB instance(s),
FLINK-15532[1] is created to track this.
BTW, have you set TTL for this job[2],  TTL can help control the state size.

[1] https://issues.apache.org/jira/browse/FLINK-15532
[2]https://issues.apache.org/jira/browse/FLINK-31089

Shammon FY  于2023年7月4日周二 09:08写道:
>
> Hi neha,
>
> Which flink version are you using? We have also encountered the issue of 
> continuous growth of off-heap memory in the TM of the session cluster before, 
> the reason is that the memory fragments cannot be reused like issue [1]. You 
> can check the memory allocator and try to use jemalloc instead refer to doc 
> [2] and [3].
>
> [1] https://issues.apache.org/jira/browse/FLINK-19125
> [2] 
> https://nightlies.apache.org/flink/flink-docs-release-1.15/release-notes/flink-1.12/#deployment
> [3] 
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#switching-the-memory-allocator
>
> Best,
> Shammon FY
>
> On Sat, Jul 1, 2023 at 2:58 PM neha goyal  wrote:
>>
>> Hello,
>>
>> I am trying to debug the unbounded memory consumption by the Flink process. 
>> The heap size of the process remains the same. The size of the RSS of the 
>> process keeps on increasing. I suspect it might be because of RocksDB.
>>
>> we have the default value for state.backend.rocksdb.memory.managed as true. 
>> Can anyone confirm that this config will Rockdb be able to take the 
>> unbounded native memory?
>>
>> If yes, what metrics can I check to confirm the issue? Any help would be 
>> appreciated.



-- 
Best,
Yanfei


Re: Checkpointed data size is zero

2023-07-03 Thread Yanfei Lei
Hi Kamal,

Is the Full Checkpoint Data Size[1] also zero? If not, it may be that
no data is processed during this checkpoint.

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/monitoring/checkpoint_monitoring/

Shammon FY  于2023年7月4日周二 09:10写道:

>
> Hi Kamal,
>
> You can check whether flink job has readed data from source in flink web ui, 
> it will show total record count and size for each operator.
>
> Best,
> Shammon FY
>
> On Sat, Jul 1, 2023 at 4:53 PM Kamal Mittal via user  
> wrote:
>>
>> Hello Community,
>>
>>
>>
>> I have a requirement to read data coming over TCP socket stream and for the 
>> same written one custom source function reading data by TCP socket.
>>
>>
>>
>> Job is running successfully but in flink dashboard checkpoint overview, 
>> checkpointed data size is 0.
>>
>>
>>
>> Can you please help if there is anything need to check or some 
>> issue/limitation due to TCP streaming?
>>
>>
>>
>> Rgds,
>>
>> Kamal



--
Best,
Yanfei


Re: RocksdbStateBackend.enableTtlCompactionFilter

2023-06-20 Thread Yanfei Lei
Hi patricia,
The TTL compaction filter in RocksDB has been enabled in 1.10 by
default and it is always enabled in 1.11+[1],  I think there is no
need to explicitly enable the ttl compaction filter.
[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.17/release-notes/flink-1.11/#removal-of-deprecated-option-to-disable-ttl-compaction-filter

patricia lee  于2023年6月20日周二 15:45写道:
>
> Hi,
>
> From 1.8 to 1.17 flink, enableTtlCompactionFilter() has been removed.
>
> I have seen some examples to do a factory of options to pass as argumets for 
> settings, is this the right approach? If not what is the best way to enable 
> the compaction filter in rocksdbstatebackend?
>
> Thanks
>
> Regards



-- 
Best,
Yanfei


Re: Changelog fail leads to job fail regardless of tolerable-failed-checkpoints config

2023-06-20 Thread Yanfei Lei
Hi Dongwoo,

State changelogs are continuously uploaded to the durable storage when
Changelog state backend is enabled. In other words, it will also
persist data **outside the checkpoint phase**, and the exception at
this time will directly cause the job to fail.  And only exceptions in
the checkpoint phase will be counted as checkpoint failures.

Dongwoo Kim  于2023年6月20日周二 18:31写道:
>
> Hello all, I have a question about changelog persist failure.
> When changelog persist process fails due to an S3 timeout, it seems to lead 
> to the job failure regardless of our 
> "execution.checkpointing.tolerable-failed-checkpoints" configuration being 
> set to 5 with this log.
>
> Caused by: java.io.IOException: The upload for 522 has already failed 
> previously
>
> Upon digging into the source code, I observed that Flink consistently checks 
> the sequence number against the latest failed sequence number, resulting in 
> an IOException. I am curious about the reasoning behind this check as it 
> seems to interfere with the "tolerable-failed-checkpoint" configuration 
> working as expected.
> Can anyone explain the goal behind this design?
> Additionally, I'd like to propose a potential solution: What if we adjusted 
> this section to allow failed changelogs to be uploaded on subsequent 
> attempts, up to the "tolerable-failed-checkpoint" limit, before declaring the 
> job as failed?
>
> Thanks in advance
>
> Best regards
> dongwoo
>
>
>
>
>
>
>


-- 
Best,
Yanfei


Re: The JobManager is taking minutes to complete and finalize checkpoints despite the Task Managers seem to complete them in a few seconds

2023-05-03 Thread Yanfei Lei
Hi Francesco,

The overall checkpoint duration in Flink UI is EndToEndDuration[1],
which is the time from Jobmanager triggering checkpoint to collecting
the last ack message sent from task manager, depending on the slowest
task manager.

> "-_message__:__"Completed checkpoint 2515895 for job 
> fdc83bb7b697b8eab84238cd588805ef (7242148389 bytes, checkpointDuration=150047 
> ms, finalizationTime=178119 ms).",_
_logger_name__:__"o.a.f.r.c.CheckpointCoordinator"

The "checkpointDuration" in this log includes the time from pending
Checkpoint to completed Checkpoint[2], and "finalizationTime" includes
the removal of older checkpoints.

> "The checkpoints are taken roughly every 6 minutes instead of the configured 
> interval of 1 minute."

When the concurrent checkpoint is set to 1, the next checkpoint will
be triggered only when the previous checkpoint is completed.

[1] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java#L188
[2] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1378

David Morávek  于2023年5月2日周二 20:14写道:
>
> Hi Francesco,
>
> Finalization also includes the removal of older checkpoints (we're only 
> keeping the last N checkpoints), which could be pretty costly in the case of 
> RocksDB (many small files). Can you check how long the removal of old 
> checkpoint files from S3 takes (there might be some rate limiting involved, 
> for example)?
>
> Best,
> D.
>
> On Tue, May 2, 2023 at 1:59 PM Francesco Leone  wrote:
>>
>> Hi There,
>>
>> We are facing a problem with the flink checkpoints in our flink cluster made 
>> of 38 task managers and 2 job managers (HA).  We are running Flink 1.15.2 on 
>> OpenJdk 11 and the checkpoints are stored in AWS S3 with presto.
>>
>> The overall checkpoint duration is reported as 8 seconds in the Flink UI 
>> (screenshot attached), but the job manager is reporting a duration of ~328 
>> seconds in its logs and the checkpoints are taken roughly every 6 minutes 
>> instead of the configured interval of 1 minute.
>>
>> JobManager logs are reporting
>> -_message__:__"Completed checkpoint 2515895 for job 
>> fdc83bb7b697b8eab84238cd588805ef (7242148389 bytes, 
>> checkpointDuration=150047 ms, finalizationTime=178119 ms).",_
>> _logger_name__:__"o.a.f.r.c.CheckpointCoordinator"
>>
>> ---
>> The checkpoint configuration is
>>
>> CheckpointConfig config = ...
>> config.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> config.setMaxConcurrentCheckpoints(1);
>>
>> checkArgument(maxFailures >= 0);
>> config.setTolerableCheckpointFailureNumber(maxFailures);
>>
>> config.setCheckpointInterval(1000 * 60);  //1 minute
>> config.setCheckpointTimeout(15 * 1000 * 60); //15 minutes
>> config.setMinPauseBetweenCheckpoints(5000);
>>
>> In the flink-conf.yaml
>> ---
>> presto.s3.ssl.enabled: true
>> presto.s3.sse.enabled: true
>> presto.s3.sse.type: S3
>> fs.s3a.buffer.dir: /mnt/data/tmp
>>
>> s3.entropy.key: HASH
>> s3.entropy.length: 4
>>
>> state.backend: rocksdb
>> state.checkpoints.dir: s3p:///checkpoints/HASH/
>>
>> state.storage.fs.memory-threshold: 1048576
>> state.storage.fs.write-buffer-size: 4194304
>> state.savepoints.dir: s3p://xx/savepoints/
>>
>> state.checkpoints.num-retained: 4
>> state.backend.local-recovery: false
>> state.backend.incremental: true
>> state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM
>> state.backend.rocksdb.options-factory: custom.RocksDbOptionsFactory
>> state.backend.rocksdb.compression: ZSTD_COMPRESSION
>>
>> execution.checkpointing.unaligned: true
>>  taskmanager.state.local.root-dirs: /mnt/data/tmp/local
>> -
>>
>> Thanks
>> Kind Regards
>>
>> Francesco



-- 
Best,
Yanfei


Re: Flink SQL State

2023-04-26 Thread Yanfei Lei
Hi Giannis,

Except “default” Colume Family(CF), all other CFs represent the state
in rocksdb state backend, the name of a CF is the name of a
StateDescriptor.

- deduplicate-state is a value state, you can find it in
DeduplicateFunctionBase.java and
MiniBatchDeduplicateFunctionBase.java, they are used for
deduplication.
- _timer_state/event_user-timers, _timer_state/event_timers ,
_timer_state/processing_timers and _timer_state/processing_user-timers
 are created by internal time service, which can be found in
InternalTimeServiceManagerImpl.java. Here is a blog post[1] on best
practices for using timers.
- timer, next-index, left and right can be found in
TemporalRowTimeJoinOperator.java, TemporalRowTimeJoinOperator
implements the logic of temporal join, this post[2] might be helpful
in understanding what happened to temporal join.

[1] 
https://www.alibabacloud.com/help/en/realtime-compute-for-apache-flink/latest/datastream-timer
[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins

Giannis Polyzos  于2023年4月26日周三 23:19写道:
>
> I have two input kafka topics - a compacted one (with upsert-kafka) and a 
> normal one.
> When I perform a temporal join I notice the following state being created in 
> rocksdb and was hoping someone could help me better understand what 
> everything means
>
>
> > deduplicate-state: does it refer to duplicate keys found by the 
> > kafka-upsert-connector?
> > timers: what timer and _timer_state/event_timers refer to and whats their 
> > difference? Is it to keep track on when the join results need to be 
> > materialised or state to be expired?
> > next-index: what does it refer to?
> > left: also I'm curious why the left cf has 407 entries. Is it records that 
> > are being buffered because there is no match on the right table?
>
> Thanks



-- 
Best,
Yanfei


Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache Paimon(incubating)

2023-03-27 Thread Yanfei Lei
Congratulations!

Best Regards,
Yanfei

ramkrishna vasudevan  于2023年3月27日周一 21:46写道:
>
> Congratulations !!!
>
> On Mon, Mar 27, 2023 at 2:54 PM Yu Li  wrote:
>>
>> Dear Flinkers,
>>
>>
>> As you may have noticed, we are pleased to announce that Flink Table Store 
>> has joined the Apache Incubator as a separate project called Apache 
>> Paimon(incubating) [1] [2] [3]. The new project still aims at building a 
>> streaming data lake platform for high-speed data ingestion, change data 
>> tracking and efficient real-time analytics, with the vision of supporting a 
>> larger ecosystem and establishing a vibrant and neutral open source 
>> community.
>>
>>
>> We would like to thank everyone for their great support and efforts for the 
>> Flink Table Store project, and warmly welcome everyone to join the 
>> development and activities of the new project. Apache Flink will continue to 
>> be one of the first-class citizens supported by Paimon, and we believe that 
>> the Flink and Paimon communities will maintain close cooperation.
>>
>>
>> 亲爱的Flinkers,
>>
>>
>> 正如您可能已经注意到的,我们很高兴地宣布,Flink Table Store 已经正式加入 Apache 孵化器独立孵化 [1] [2] 
>> [3]。新项目的名字是 Apache 
>> Paimon(incubating),仍致力于打造一个支持高速数据摄入、流式数据订阅和高效实时分析的新一代流式湖仓平台。此外,新项目将支持更加丰富的生态,并建立一个充满活力和中立的开源社区。
>>
>>
>> 在这里我们要感谢大家对 Flink Table Store 项目的大力支持和投入,并热烈欢迎大家加入新项目的开发和社区活动。Apache Flink 
>> 将继续作为 Paimon 支持的主力计算引擎之一,我们也相信 Flink 和 Paimon 社区将继续保持密切合作。
>>
>>
>> Best Regards,
>>
>> Yu (on behalf of the Apache Flink PMC and Apache Paimon PPMC)
>>
>>
>> 致礼,
>>
>> 李钰(谨代表 Apache Flink PMC 和 Apache Paimon PPMC)
>>
>>
>> [1] https://paimon.apache.org/
>>
>> [2] https://github.com/apache/incubator-paimon
>>
>> [3] https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal


[ANNOUNCE] FRocksDB 6.20.3-ververica-2.0 released

2023-01-30 Thread Yanfei Lei
It is very happy to announce the release of FRocksDB 6.20.3-ververica-2.0.

Compiled files for Linux x86, Linux arm, Linux ppc64le, MacOS x86,
MacOS arm, and Windows are included in FRocksDB 6.20.3-ververica-2.0
jar, and the FRocksDB in Flink 1.17 would be updated to
6.20.3-ververica-2.0.

Release highlights:
- [FLINK-30457] Add periodic_compaction_seconds option to RocksJava[1].
- [FLINK-30321] Upgrade ZLIB of FRocksDB to 1.2.13[2].
- Avoid expensive ToString() call when not in debug[3].
- [FLINK-24932] Support build FRocksDB Java on Apple silicon[4].

Maven artifacts for FRocksDB can be found at:
https://mvnrepository.com/artifact/com.ververica/frocksdbjni

We would like to thank all efforts from the Apache Flink community
that made this release possible!

[1] https://issues.apache.org/jira/browse/FLINK-30457
[2] https://issues.apache.org/jira/browse/FLINK-30321
[3] https://github.com/ververica/frocksdb/pull/55
[4] https://issues.apache.org/jira/browse/FLINK-24932

Best regards,
Yanfei
Ververica(Alibaba)


Re: How does Flink plugin system work?

2023-01-02 Thread Yanfei Lei
Hi Ruibin,

"metrics.reporter.prom.class" is deprecated in 1.16, maybe "
metrics.reporter.prom.factory.class"[1] can solve your problem.
After reading the related code[2], I think the root cause is that  "
metrics.reporter.prom.class" would load the code via flink's classpath
instead of MetricReporterFactory, due to "Plugins cannot access classes
from other plugins or from Flink that have not been specifically
whitelisted"[3], so ClassNotFoundException is thrown.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/metric_reporters/#prometheus
[2]
https://github.com/apache/flink/blob/release-1.16/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java#L457
[3]
https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/

Matthias Pohl via user  于2023年1月2日周一 20:27写道:

> Hi Ruibin,
> could you switch to using the currently supported way for instantiating
> reporters using the factory configuration parameter [1][2]?
>
> Based on the ClassNotFoundException, your suspicion might be right that
> the plugin didn't make it onto the classpath. Could you share the
> startup logs of the JM and TMs. That might help getting a bit more context
> on what's going on. Your approach on integrating the reporter through the
> plugin system [3] sounds about right as far as I can see.
>
> Matthias
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/#factory-class
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/#prometheus
> [3]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/
>
> On Fri, Dec 30, 2022 at 11:42 AM Ruibin Xing  wrote:
>
>> Hi community,
>>
>> I am having difficulty understanding the Flink plugin system. I am
>> attempting to enable the Prometheus exporter with the official Flink image
>> 1.16.0, but I am experiencing issues with library dependencies. According
>> to the plugin documentation (
>> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/),
>> as long as the library is located in the /opt/flink/plugins/
>> directory, Flink should automatically load it, similar to how it loads
>> libraries in the /opt/flink/lib directory. However, Flink does not seem to
>> detect the plugin.
>>
>> Here is the directory structure for /opt/flink:
>> > tree /opt/flink
>> .
>> 
>> ├── plugins
>> │   ├── metrics-prometheus
>> │   │   └── flink-metrics-prometheus-1.16.0.jar
>> ...
>>
>> And here is the related Flink configuration:
>> > metrics.reporter.prom.class:
>> org.apache.flink.metrics.prometheus.PrometheusReporter
>>
>> The error logs in the task manager show the following:
>> 2022-12-30 10:03:55,840 WARN
>>  org.apache.flink.runtime.metrics.ReporterSetup   [] - The
>> reporter configuration of 'prom' configures the reporter class, which is a
>> deprecated approach to configure reporters. Please configure a factory
>> class instead: 'metrics.reporter.prom.factory.class: ' to
>> ensure that the configuration continues to work with future versions.
>> 2022-12-30 10:03:55,841 ERROR
>> org.apache.flink.runtime.metrics.ReporterSetup   [] - Could not
>> instantiate metrics reporter prom. Metrics might not be exposed/reported.
>> java.lang.ClassNotFoundException:
>> org.apache.flink.metrics.prometheus.PrometheusReporter
>> at jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) ~[?:?]
>> at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown
>> Source) ~[?:?]
>> at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>> at java.lang.Class.forName0(Native Method) ~[?:?]
>> at java.lang.Class.forName(Unknown Source) ~[?:?]
>> at
>> org.apache.flink.runtime.metrics.ReporterSetup.loadViaReflection(ReporterSetup.java:456)
>> ~[flink-dist-1.16.0.jar:1.16.0]
>> at
>> org.apache.flink.runtime.metrics.ReporterSetup.loadReporter(ReporterSetup.java:409)
>> ~[flink-dist-1.16.0.jar:1.16.0]
>> at
>> org.apache.flink.runtime.metrics.ReporterSetup.setupReporters(ReporterSetup.java:328)
>> ~[flink-dist-1.16.0.jar:1.16.0]
>> at
>> org.apache.flink.runtime.metrics.ReporterSetup.fromConfiguration(ReporterSetup.java:209)
>> ~[flink-dist-1.16.0.jar:1.16.0]
>>
>> The Java commands for Flink process:
>> flink  1  3.0  4.6 2168308 765936 ?  Ssl  10:03   1:08
>> /opt/java/openjdk/bin/java -XX:+UseG1GC -Xmx697932173 -Xms697932173
>> -XX:MaxDirectMemorySize=300647712 -XX:MaxMetaspaceSize=268435456
>> -Dlog.file=/opt/flink/log/flink--kubernetes-taskmanager-0-checkpoint-ha-example-taskmanager-1-1.log
>> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
>> -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties
>> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
>> -classpath
>> /opt/flink/lib/flink-cep-1.16.0.jar:/opt/flink/lib/flink-connector-files-1.16.0.jar:/opt/flink/lib/flink-csv-1.16.0.j

Re: ZLIB Vulnerability Exposure in Flink statebackend RocksDB

2022-12-12 Thread Yanfei Lei
Hey Vidya Sagar,

*- Is the code actually using this compression library? Can this
vulnerability issue be ignored?*

I glanced at the LZ4 in Flink. IIUC, LZ4 is used to compress blocks in
batch table which was introduced by FLINK-11858[1], FLINK-23447[2] bumped
it to 1.8. So, LZ4 is actually used by some code.

*- * *would it be ok if we upgrade the version of LZ4 in our local cloned
code base?*

 I guess you can refer to FLINK-23447[2] to upgrade it. I am not familiar
with batch mode, AFAIK, flink-table-runtime[3] would definitely be affected.


[1] https://issues.apache.org/jira/browse/FLINK-11858
[2] https://issues.apache.org/jira/browse/FLINK-23447
[3]
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L213

Martijn Visser  于2022年12月9日周五 18:23写道:

> Hi Vidya,
>
> Please keep in mind that the Flink project is driven by volunteers. If
> you're noticing an outdated version for the lz4 compression library and an
> update is required, it would be great if you can open the PR to update that
> dependency yourself.
>
> Best regards,
>
> Martijn
>
> On Thu, Dec 8, 2022 at 10:31 PM Vidya Sagar Mula 
> wrote:
>
>> Thank you Yanfei for taking this issue as a bug and planning a fix in the
>> upcoming version.
>>
>> I have another vulnerability bug coming on our product. It is related to
>> the "LZ4" compression library version. Can you please take a look at this
>> link?
>> https://nvd.nist.gov/vuln/detail/CVE-2019-17543
>>
>> I have noticed that, Flink code base is using "*1.8.0> *.version>*". Vulnerability is present for the versions *before 1.9.2.*
>>
>> https://github.com/apache/flink/blob/master/pom.xml
>>
>> Can you please look into this issue also and address it in the coming
>> releases?
>>
>> Questions:
>> ---
>> - Is the code actually using this compression library? Can this
>> vulnerability issue be ignored?
>>
>> - Can you please let me know if this is going to be addressed. If yes,
>> until we move to the new Flink version to get the latest changes, would it
>> be ok if we upgrade the version of LZ4 in our local cloned code base? I
>> would like to understand the impact if we make changes in our local Flink
>> code with regards to testing efforts and any other affected modules?
>>
>> Can you please clarify this?
>>
>> Thanks,
>> Vidya Sagar.
>>
>>
>> On Wed, Dec 7, 2022 at 7:59 AM Yanfei Lei  wrote:
>>
>>> Hi Vidya Sagar,
>>>
>>> Thanks for bringing this up.
>>>
>>> The RocksDB state backend defaults to Snappy[1]. If the compression
>>> option is not specifically configured, this vulnerability of ZLIB has no
>>> effect on the Flink application for the time being.
>>>
>>> *> is there any plan in the coming days to address this? *
>>>
>>> The FRocksDB 6.20.3-ververica-1.0
>>> <https://mvnrepository.com/artifact/com.ververica/frocksdbjni/6.20.3-ververica-1.0>
>>>   does
>>> depend on ZLIB 1.2.11, FLINK-30321 is created to address this.
>>>
>>> *> If this needs to be fixed, is there any plan from Ververica to
>>> address this vulnerability?*
>>>
>>> Yes, we plan to publish a new version of FRocksDB[3] in Flink 1.17, and 
>>> FLINK-30321
>>> would be included in the new release.
>>>
>>> *> how to address this vulnerability issue as this is coming as a high
>>> severity blocking issue to our product.*
>>>
>>> As a kind of mitigation, don't configure ZLIB compression for RocksDB
>>> state backend.
>>> If ZLIB must be used now and your product can't wait, maybe you can
>>> refer to this release document[4] to release your own version.
>>>
>>> [1] https://github.com/facebook/rocksdb/wiki/Compression
>>> [2] https://issues.apache.org/jira/browse/FLINK-30321
>>> [3] https://cwiki.apache.org/confluence/display/FLINK/1.17+Release
>>> [4]
>>> https://github.com/ververica/frocksdb/blob/FRocksDB-6.20.3/FROCKSDB-RELEASE.md
>>>
>>> --
>>> Best,
>>> Yanfei
>>> Ververica (Alibaba)
>>>
>>> Vidya Sagar Mula  于2022年12月7日周三 06:47写道:
>>>
>>>> Hi,
>>>>
>>>> There is a ZLIB vulnerability reported by the official National
>>>> Vulnerability Database. This vulnerability causes memory corruption while
>>>> deflating with ZLIB version less than 1.2.12.
>>>> Here is the

Re: SNI issue

2022-12-08 Thread Yanfei Lei
Hi, I didn't face this issue, and I'm guessing it might have something to
do with the configuration of SSL[1], have you configured the
"security.ssl.rest.enabled" option?

[1]
https://cnightlies.apache.org/flink/flink-docs-master/docs/deployment/security/security-ssl/#configuring-ssl


Jean-Damien Hatzenbuhler via user  于2022年12月8日周四
01:12写道:

> Hello,
> When using the job manager API with an https proxy that uses SNI in front
> to route the traffic, I get an issue because the flink cli doesn't use the
> SNI when calling in https the API.
> Did other user face this issue ?
> Regards
>


-- 
Best,
Yanfei


Re: Exceeded Checkpoint tolerable failure

2022-12-08 Thread Yanfei Lei
Hi Madan,

Maybe you can check the value of  "
*execution.checkpointing.tolerable-failed-checkpoints"*[1] in your
application configuration, and try to increase this value?

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-checkpointing-tolerable-failed-checkpoints

Madan D via user  于2022年12月8日周四 11:40写道:

> Hello All,
> I am seeing below issue after I upgraded from 1.9.0 to 1.14.2 while
> publishing messages to pub sub which is causing frequent job restart and
> slow processing.
>
> Can you please help me.
>
> `Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded
> checkpoint tolerable failure threshold.
>at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:98)
>at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:67)
>at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1940)
>at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1912)
>at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:98)
>at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1996)
>at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>at java.util.concurrent.ThreadPoolExecut
>
> Regards,
> Madan
>
>
>

-- 
Best,
Yanfei


Re: ZLIB Vulnerability Exposure in Flink statebackend RocksDB

2022-12-07 Thread Yanfei Lei
Hi Vidya Sagar,

Thanks for bringing this up.

The RocksDB state backend defaults to Snappy[1]. If the compression option
is not specifically configured, this vulnerability of ZLIB has no effect on
the Flink application for the time being.

*> is there any plan in the coming days to address this? *

The FRocksDB 6.20.3-ververica-1.0

 does
depend on ZLIB 1.2.11, FLINK-30321 is created to address this.

*> If this needs to be fixed, is there any plan from Ververica to address
this vulnerability?*

Yes, we plan to publish a new version of FRocksDB[3] in Flink 1.17,
and FLINK-30321
would be included in the new release.

*> how to address this vulnerability issue as this is coming as a high
severity blocking issue to our product.*

As a kind of mitigation, don't configure ZLIB compression for RocksDB state
backend.
If ZLIB must be used now and your product can't wait, maybe you can refer
to this release document[4] to release your own version.

[1] https://github.com/facebook/rocksdb/wiki/Compression
[2] https://issues.apache.org/jira/browse/FLINK-30321
[3] https://cwiki.apache.org/confluence/display/FLINK/1.17+Release
[4]
https://github.com/ververica/frocksdb/blob/FRocksDB-6.20.3/FROCKSDB-RELEASE.md

--
Best,
Yanfei
Ververica (Alibaba)

Vidya Sagar Mula  于2022年12月7日周三 06:47写道:

> Hi,
>
> There is a ZLIB vulnerability reported by the official National
> Vulnerability Database. This vulnerability causes memory corruption while
> deflating with ZLIB version less than 1.2.12.
> Here is the link for details...
>
> https://nvd.nist.gov/vuln/detail/cve-2018-25032#vulnCurrentDescriptionTitle
>
> *How is it linked to Flink?: *
> In the Flink statebackend rocksdb, there is ZLIB version 1.2.11 is used as
> part of the .so file. Hence, there is vulnerability exposure here.
>
> *Flink code details/links:*
> I am seeing the latest Flink code base where the statebackend rocksdb
> library *(frocksdbjni)* is coming from Ververica. The pom.xml dependency
> snapshot is here
>
>
> https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/pom.xml
>
> 
>
> com.ververica
>
> frocksdbjni
>
> 6.20.3-ververica-1.0
>
> 
>
>
> When I see the frocksdbjni code base, the makefile is pointing to
> ZLIB_VER=1.2.11. This ZLIB version is vulnerable as per the NVD.
>
> https://github.com/ververica/frocksdb/blob/FRocksDB-6.20.3/Makefile
>
> *Questions:*
>
> - This vulnerability is marked as HIGH severity. How is it addressed at
> the Flink/Flink Stateback RocksDb? If not now, is there any plan in the
> coming days to address this?
>
> - As the Statebackend RocksDb is coming from Ververica, I am not seeing
> any latest artifacts published from them. As per the Maven Repository, the
> latest version is 6.20.3-ververica-1.0
> 
>  and
> this is the one used in the Flink code base.
>
> https://mvnrepository.com/artifact/com.ververica/frocksdbjni
>
> If this needs to be fixed, is there any plan from Ververica to address
> this vulnerability?
>
> - From the Flink user perspective, it is not simple to make the changes to
> .so file locally. How are the Flink user companies addressing this
> vulnerability as it needs changes to the .SO file?
>
> Overall, my main question to the community is, how to address this
> vulnerability issue as this is coming as a high severity blocking issue to
> our product.
>
> Please provide the inputs/suggestions at the earliest.
>
> Thanks,
> Vidya Sagar.
>
>
>
>
>


Re: Task Manager restart and RocksDB incremental checkpoints issue.

2022-11-14 Thread Yanfei Lei
Hi Vidya,
Thanks for sharing your setups.

> *What do you think about the older files that are pulled from the
hostpath to mount path should be deleted first and then create the new
instanceBasepath?*
I think that deleting the old instance path after restarting is hard to
achieve with the current implementation, because the random UUID of old
instancePath isn't recorded and we don't know which path to delete.

> *What is the general design recommendation is such cases where RocksDB
has mount path to a Volume on host node?*
For me, I usually use emptyDir[1] to sidestep the deleting problem, let k8s
be responsible for deleting old rocksdb instancePath: when a Pod is removed
from a node for any reason, the rocksdb instancePath in the emptyDir is
deleted as the pod is freed.

Hope this will be useful, maybe there are some alternatives.

[1] https://kubernetes.io/docs/concepts/storage/volumes/#emptydir

--
Best,
Yanfei

Martijn Visser  于2022年11月15日周二 05:16写道:

> Hi Vidya,
>
> It is, until Flink 1.17 is released beginning of next year.
> While that code might not have been changed, there can be other changes
> that have impact. See for example this blog
> https://flink.apache.org/2022/05/06/restore-modes.html
>
> Best regards,
>
> Martijn
>
> Op ma 14 nov. 2022 om 17:45 schreef Vidya Sagar Mula 
>
>> Hi Martjin,
>>
>> Thanks for the info. We are in the process of moving to 1.15. Is this
>> version actively supported by community?
>>
>> And coming to my original and follow up questions, I checked the
>> RocksDbStatebackend code from 1.11 and 1.15, it is same.
>>
>> Given K8s configuration with Volume and mounth path, I would like to know
>> the design recommendation for Rocks Db local storage path.
>>
>> Thanks,
>> Vidya
>>
>> On Mon, Nov 14, 2022 at 6:57 AM Martijn Visser 
>> wrote:
>>
>>> Hi Vidya,
>>>
>>> Given that you are still on Flink 1.11 which was released in July 2020
>>> and no longer supported by the community, I would recommend first upgrading
>>> to a later, supported version like Flink 1.16.
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> On Sat, Nov 12, 2022 at 8:07 PM Vidya Sagar Mula 
>>> wrote:
>>>
>>>> Hi Yanfei,
>>>>
>>>> Thank you for the response. I have follow up answer and questions.
>>>>
>>>> I have two set ups. One is on the local environment and the other one
>>>> is a deployment scenario that is on K8s.
>>>>
>>>> - In K8s set up, I have Volume on the cluster node and mount path is
>>>> specified for the RockDB checkpoints location. So, when the Application TM
>>>> POD is restarted, the older checkpoints are read back from the host path
>>>> again when the TM is UP again.
>>>> In this case, RocksDB local directory is pulled with all the older data
>>>> which is not useful for the JOB ID as the "instanceBasePath" is calculated
>>>> with new random UUID.
>>>>
>>>> Questions:
>>>> - What do you think about the older files that are pulled from the
>>>> hostpath to mount path should be deleted first and then create the new
>>>> instanceBasepath?
>>>> Otherwise, we are going to be ended with the GBs of unwanted data.
>>>>
>>>> What is the general design recommendation is such cases where RocksDB
>>>> has mount path to a Volume on host node?
>>>> Please clarify.
>>>>
>>>> Thanks,
>>>> Vidya Sagar.
>>>>
>>>>
>>>> On Thu, Nov 10, 2022 at 7:52 PM Yanfei Lei  wrote:
>>>>
>>>>> Hi Vidya Sagar,
>>>>>
>>>>> Could you please share the reason for TaskManager restart? If the
>>>>> machine or JVM process of TaskManager crashes, the
>>>>> `RocksDBKeyedStateBackend` can't be disposed/closed normally,  so the
>>>>> existing rocksdb instance directory would remain.
>>>>>
>>>>> BTW, if you use Application Mode on k8s, if a TaskManager(pod)
>>>>> crashes, the rocksdb directory would be deleted as the pod is released.
>>>>>
>>>>> Best,
>>>>> Yanfei
>>>>>
>>>>> Vidya Sagar Mula  于2022年11月11日周五 01:39写道:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am using RocksDB state backend for incremental checkpointing with
>>>>>> Flink 1.11 version.
>>>>>>
>>>>

Re: Task Manager restart and RocksDB incremental checkpoints issue.

2022-11-10 Thread Yanfei Lei
Hi Vidya Sagar,

Could you please share the reason for TaskManager restart? If the machine
or JVM process of TaskManager crashes, the `RocksDBKeyedStateBackend` can't
be disposed/closed normally,  so the existing rocksdb instance directory
would remain.

BTW, if you use Application Mode on k8s, if a TaskManager(pod) crashes, the
rocksdb directory would be deleted as the pod is released.

Best,
Yanfei

Vidya Sagar Mula  于2022年11月11日周五 01:39写道:

> Hi,
>
> I am using RocksDB state backend for incremental checkpointing with Flink
> 1.11 version.
>
> Question:
> --
> For a given Job ID, Intermediate RocksDB checkpoints are stored under the
> path defined with ""
>
> The files are stored with "_jobID+ radom UUID" prefixed to the location.
>
> Case : 1
> -
> - When I cancel the job, then all the rocksDB checkpoints are deleted
> properly from the location corresponding to that JobId.
> (based on "instanceBasePath" variable stored in RocksDBKeyedStateBackend
> object).
> "NO Issue here. Working as expected".
>
> Case : 2
> -
> - When my TaskManger is restarted, the existing rocksDb checkpoints are
> not deleted.
> New "instanceBasePath" is constructed with the new Random UUID appended to
> the directory.
> And, old checkpoint directories are still there.
>
> questions:
> - Is this expected behaviour not to delete the existing checkPoint
> dirs under the rocksDB local directory?
> - I see the "StreamTaskStateInitializerImpl.java", where new StateBackend
> objects are created. In this case, new directory is created for this Job ID
> appended with new random UUID.
> What happens to the old Directories. Are they going to be purged later on?
> If not, the disk is going to filled up with the older checkpoints. Please
> clarify this.
>
> Thanks,
> Vidya Sagar.
>


Re: Does reduce function on keyed window gives any guarantee on the order of elements?

2022-11-04 Thread Yanfei Lei
Hi Qing,
> am I right to think that there will be 1 reduce function per key, and
they will never overlap?

I agree with this, please correct me if I'm wrong.  For a certain key, it
will be divided into a certain key group range, and thus sent to a certain
sub-task(reduce function instance). A key is only processed by one sub-task
of an operator, so the subtasks of one key don't overlap.  But in turn, a
subtask will process multiple keys.
For example: if we have many pairs like: (k1,v11), ... (k2,v2) ... (k1,v12)
...(k3,v3) ... (k1,v13),...   v12 is always processed after v11.

Best,
Yanfei

Qing Lim  于2022年11月3日周四 16:17写道:

> Hi Yanfei
>
> Thanks for the explanation.
>
>
>
> If I use reduce in the context of keyed stream with window, am I right to
> think that there will be 1 reduce function per key, and they will never
> overlap? Each reduce function instance will only receive elements from the
> same key in order.
>
>
>
> *From:* Yanfei Lei 
> *Sent:* 03 November 2022 03:06
> *To:* Qing Lim 
> *Cc:* User 
> *Subject:* Re: Does reduce function on keyed window gives any guarantee
> on the order of elements?
>
>
>
> Hi Qing,
>
> > Does it guarantee that it will be called in the same order of elements
> in the stream, where value2 is always 1 element after value1?
>
>
> Order is maintained within each parallel stream partition. If the reduce
> operator only has one sending- sub-task, the answer is YES, but if reduce
> operator has multiple sending- sub-task, order among elements is only
> preserved for each pair of sending- and receiving tasks.
>
> The answers under
> https://stackoverflow.com/questions/38354713/ordering-of-records-in-stream 
> might
> help.
>
> Best,
>
> Yanfei
>
>
>
> Qing Lim  于2022年11月3日周四 01:32写道:
>
> Hi, Flink User Group
>
>
>
> I am trying to use Reduce function, I wonder does it guarantee order when
> its called?
>
>
>
> The signature is as follow:
>
>
>
> T reduce(T value1, T value2) throws Exception;
>
>
>
> Does it guarantee that it will be called in the same order of elements in
> the stream, where value2 is always 1 element after value1?
>
>
>
> Kind regards
>
>
>
>
>
> *Qing Lim *| Marshall Wace LLP, George House, 131 Sloane Street, London | 
> E-mail:
> q@mwam.com | Tel: +44 207 925 4865
>
>
>
>
>
> This e-mail and any attachments are confidential to the addressee(s) and
> may contain information that is legally privileged and/or confidential. If
> you are not the intended recipient of this e-mail you are hereby notified
> that any dissemination, distribution, or copying of its content is strictly
> prohibited. If you have received this message in error, please notify the
> sender by return e-mail and destroy the message and all copies in your
> possession.
>
>
> To find out more details about how we may collect, use and share your
> personal information, please see https://www.mwam.com/privacy-policy.
> This includes details of how calls you make to us may be recorded in order
> for us to comply with our legal and regulatory obligations.
>
>
> To the extent that the contents of this email constitutes a financial
> promotion, please note that it is issued only to and/or directed only at
> persons who are professional clients or eligible counterparties as defined
> in the FCA Rules. Any investment products or services described in this
> email are available only to professional clients and eligible
> counterparties. Persons who are not professional clients or eligible
> counterparties should not rely or act on the contents of this email.
>
>
> Marshall Wace LLP is authorised and regulated by the Financial Conduct
> Authority. Marshall Wace LLP is a limited liability partnership registered
> in England and Wales with registered number OC302228 and registered office
> at George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving
> this e-mail as a client, or an investor in an investment vehicle, managed
> or advised by Marshall Wace North America L.P., the sender of this e-mail
> is communicating with you in the sender's capacity as an associated or
> related person of Marshall Wace North America L.P. ("MWNA"), which is
> registered with the US Securities and Exchange Commission ("SEC") as an
> investment adviser.  Registration with the SEC does not imply that MWNA or
> its employees possess a certain level of skill or training.
>
>
>
>


-- 
Best,
Yanfei


Re: Does reduce function on keyed window gives any guarantee on the order of elements?

2022-11-02 Thread Yanfei Lei
Hi Qing,
> Does it guarantee that it will be called in the same order of elements in
the stream, where value2 is always 1 element after value1?

Order is maintained within each parallel stream partition. If the reduce
operator only has one sending- sub-task, the answer is YES, but if reduce
operator has multiple sending- sub-task, order among elements is only
preserved for each pair of sending- and receiving tasks.
The answers under
https://stackoverflow.com/questions/38354713/ordering-of-records-in-stream
might
help.

Best,
Yanfei

Qing Lim  于2022年11月3日周四 01:32写道:

> Hi, Flink User Group
>
>
>
> I am trying to use Reduce function, I wonder does it guarantee order when
> its called?
>
>
>
> The signature is as follow:
>
>
>
> T reduce(T value1, T value2) throws Exception;
>
>
>
> Does it guarantee that it will be called in the same order of elements in
> the stream, where value2 is always 1 element after value1?
>
>
>
> Kind regards
>
>
>
>
>
> *Qing Lim *| Marshall Wace LLP, George House, 131 Sloane Street, London | 
> E-mail:
> q@mwam.com | Tel: +44 207 925 4865
>
>
>
>
>
> This e-mail and any attachments are confidential to the addressee(s) and
> may contain information that is legally privileged and/or confidential. If
> you are not the intended recipient of this e-mail you are hereby notified
> that any dissemination, distribution, or copying of its content is strictly
> prohibited. If you have received this message in error, please notify the
> sender by return e-mail and destroy the message and all copies in your
> possession.
>
>
> To find out more details about how we may collect, use and share your
> personal information, please see https://www.mwam.com/privacy-policy.
> This includes details of how calls you make to us may be recorded in order
> for us to comply with our legal and regulatory obligations.
>
>
> To the extent that the contents of this email constitutes a financial
> promotion, please note that it is issued only to and/or directed only at
> persons who are professional clients or eligible counterparties as defined
> in the FCA Rules. Any investment products or services described in this
> email are available only to professional clients and eligible
> counterparties. Persons who are not professional clients or eligible
> counterparties should not rely or act on the contents of this email.
>
>
> Marshall Wace LLP is authorised and regulated by the Financial Conduct
> Authority. Marshall Wace LLP is a limited liability partnership registered
> in England and Wales with registered number OC302228 and registered office
> at George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving
> this e-mail as a client, or an investor in an investment vehicle, managed
> or advised by Marshall Wace North America L.P., the sender of this e-mail
> is communicating with you in the sender's capacity as an associated or
> related person of Marshall Wace North America L.P. ("MWNA"), which is
> registered with the US Securities and Exchange Commission ("SEC") as an
> investment adviser.  Registration with the SEC does not imply that MWNA or
> its employees possess a certain level of skill or training.
>


Re: Broadcast state and OutOfMemoryError: Direct buffer memory

2022-10-21 Thread yanfei lei
Hi Dan,
Usually broadcast state needs more network buffers, the network buffer used
to exchange data records between tasks would request a portion of direct
memory[1],  I think it is possible to get the “Direct buffer memory” OOM
errors in this scenarios. Maybe you can try to increase
taskmanager.memory.framework.off-heap.size

 and taskmanager.memory.task.off-heap.size

.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/#detailed-memory-model

Best,
Yanfei

Dan Hill  于2022年10月21日周五 15:39写道:

> Hi.  My team recently added broadcast state to our Flink jobs.  We've
> started hitting this OOM "Direct buffer memory" error.  Is this a common
> problem with broadcast state?  Or is it likely a different problem?
> Thanks! - Dan
>


Re: Presto S3 filesystem access issue - checkpointing - EKS

2022-10-17 Thread yanfei lei
Hi Vignesh,
403 status code makes this look like an authorization issue.

>
* Some digging into the presto configs and I had this one turned off
topresto.s3.use-instance-credentials: "false". (Is this right?)*

>From the document[1], it is recommended that set hive.
*s3.use-instance-credentials* to *true* and use IAM Roles for *EC2* to
govern access to S3.
Maybe you can try the following two ways:
1) Set *s3.use-instance-credentials* to *true *and use IAM roles.
2) Or set hive.s3.aws-access-key and hive.s3.aws-secret-key directly.


[1]https://prestodb.io/docs/current/connector/hive.html#s3-credentials

Best,
Yanfei




Vignesh Kumar Kathiresan via user  于2022年10月18日周二
03:48写道:

> Hello all,
>
> I am trying to achieve flink application checkpointing to s3 using the
> recommended presto s3 filesystem plugin.
> My application is deployed in a kubernetes cluster (EKS) in flink
> application mode.
>
> When I start the application I am getting a forbidden 403 response
>
> ```Caused by:
> com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
> com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
> Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: ;
> Proxy: null) Path: s3p://bucket/checkpoint_dir/xxx/chk-2/yyy)
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$5(PrestoS3FileSystem.java:677)
> ~[?:?]
> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:139)
> ~[?:?]```
>
> So far I have
> 1) the IAM role attached to the service account has been given full s3
> access.
> 2) the config for checkpointing as
> state.checkpoints.dir: s3p://BUCKET_NAME/checkpoints  (tried with s3://
>  also)
> 3) Some digging into the presto configs and I had this one turned off too
> presto.s3.use-instance-credentials: "false". (Is this right?)
>
> Is there something I am missing(some other config to be set?) for this
> checkpointing access.
>
> P.S we have other application level access to s3 working fine
>
> Thanks,
> Vignesh
>


Re: In-flight data within aligned checkpoints/savepoints

2022-08-22 Thread yanfei lei
Hi Darin,

>  I often see my checkpoints contain "Processed (persisted) in-flight
data".
The values outside the parentheses represent Processed in-flight data[1],
and the values inside the parentheses represent persisted in-flight data[1]
, what kind of case did you see in your WEB UI?If the >0 values are outside
the parentheses, I think it represents Processed in-flight data.

> Is it possible there is some configuration I could be missing? I have not
explicitly disabled unaligned checkpoints but perhaps I should?
 You can see whether unaligned checkpoints are enabled by configuration-tab
[2].

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/monitoring/checkpoint_monitoring/#history-tab
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/monitoring/checkpoint_monitoring/#configuration-tab


Best,
Yanfei

Darin Amos via user  于2022年8月23日周二 07:02写道:

> Hi All!
>
> I am running Flink 1.13.1 and I have unaligned checkpoints disabled,
> however I often see my checkpoints contain "Processed (persisted)
> in-flight data". According to the Flink documentation, it states:
>
>- *Persisted in-flight data: The number of bytes persisted during the
>alignment (time between receiving the first and the last checkpoint
>barrier) over all acknowledged subtasks. This is > 0 only if the unaligned
>checkpoints are enabled.*
>
> I'm happy to share screenshots of my Checkpoint configuration and history
> tabs if needed. Colleagues of mine running 1.14.X have also observed the
> same behavior. Is it possible there is some configuration I could be
> missing? I have not explicitly disabled unaligned checkpoints but perhaps I
> should? I wonder if there might be some kind of bug where the UI is showing
> unaligned checkpoints, but the behaviour is not honoring that config.
>
> Thanks!
>
> Darin
>


Re: get state from window

2022-08-17 Thread yanfei lei
Hi,  there are two methods on the Context object that a process() invocation
receives that allows access to the two types of state:

   - globalState(), which allows access to keyed state that is not scoped
   to a window
   - windowState(), which allows access to keyed state that is also scoped
   to the window

maybe you can refer to the implementation of WindowReaderTest

.

Best,
Yanfei

2022年8月18日 上午10:05,曲洋  写道:

Hi dear engineers,

I have one question:  does flink streaming support getting the state.I
override the open method in the map processor,initializing the state(some
metric for the data) .How can I get the state in my window?

Thanks for your help!


Re: How to clean up RocksDB local directory in K8s statefulset

2022-06-27 Thread yanfei lei
Hi Allen, what volumes do you use for your TM pod? If you want your data to
be deleted when the pod restarts, you can use an ephemeral volume like
EmptyDir.
And Flink should remove temporary files automatically when they are not
needed anymore(see this discussion
).

Working directory only takes effects after Flink 1.15,  a local RocksDB
directory is usually located under /tmp directory in Flink 1.14,  if you
don't specifically configure state.backend.rocksdb.localdir
.
So, the working directory can't help.

Allen Wang  于2022年6月28日周二 04:39写道:

> Hi Folks,
>
> We created a stateful job using SessionWindow and RocksDB state backend
> and deployed it on Kubernetes Statefulset with persisted volumes. The Flink
> version we used is 1.14.
>
> After the job runs for some time, we observed that the size of the local
> RocksDB directory started to grow and there are more and more
> directories created inside it. It seems that when the job is restarted or
> the task manager K8s pod is restarted, the previous RocksDB directory
> corresponding to the assigned operator is not cleaned up. Here is an
> example:
>
> drwxr-xr-x 3 root root 4096 Jun 27 18:23
> job__op_WindowOperator_2b0a50a068bb7f1c8a470e4f763cbf26__1_4__uuid_c97f3f3f-649a-467d-82af-2bc250ec6e22
> drwxr-xr-x 3 root root 4096 Jun 27 18:45
> job__op_WindowOperator_2b0a50a068bb7f1c8a470e4f763cbf26__1_4__uuid_e4fca2c3-74c7-4aa2-9ca1-dda866b8de11
> drwxr-xr-x 3 root root 4096 Jun 27 18:56
> job__op_WindowOperator_2b0a50a068bb7f1c8a470e4f763cbf26__2_4__uuid_f1fa-7402-494d-80d7-65861394710c
> drwxr-xr-x 3 root root 4096 Jun 27 17:34
> job__op_WindowOperator_f6dc7f4d2283f4605b127b9364e21148__3_4__uuid_08a14423-bea1-44ce-96ee-360a516d72a6
>
> Although only
> job__op_WindowOperator_2b0a50a068bb7f1c8a470e4f763cbf26__2_4__uuid_f1fa-7402-494d-80d7-65861394710c
> is the active running operator, the other directories for the past
> operators still exist.
>
> We set up the task manager property taskmanager.resource-id to be the task
> manager pod name under the statefulset but it did not seem to help cleaning
> up previous directories.
>
> Any pointers to solve this issue?
>
> We checked the latest document and it seems that Flink 1.15 introduced the
> concept of local working directory:
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/working_directory/.
> Does that help cleaning up the RocksDB directory?
>
> Thanks,
> Allen
>
>
>
>
>