退订

2022-12-19 Thread 邱钺
退订


Understanding pipelined regions

2022-12-19 Thread Raihan Sunny
Hi,

I'm quite new to the world of stream and batch processing. I've been
reading about pipelined regions in Flink and am quite confused by what it
means. My specific problem involves a streaming job that looks like the
following:

1. There is a Kafka source that takes in an input data that sets off a
series of operations
2. As part of the first operation, I have an operator that produces
multiple values, each of which has to be fed into several different
operators in parallel
3. The operators each produce a result which I keyBy and merge together
using the union operator
4. The merged result is then written to a Kafka sink

The problem is that when one of the parallel operators throws an exception,
all the tasks in the entire pipeline gets restarted including the source
which then replays the input data and the process starts off once again. My
question is if it's possible to make the tasks of only the branch that
failed restart rather than the whole job. I do realize that it is possible
to split up the job such that the first operator produces its output to a
sink and having that as the source to the subsequent operations can
mitigate the problem. I was just wondering if it's possible in the scenario
that I have described above. In general, how can I "create" a pipelined
region?


Thanks,
Sunny

-- 






Secure Link Services Group
Zürich: The Circle 37, 8058 
Zürich-Airport, Switzerland
Munich: Tal 44, 80331 München, Germany
Dubai: 
Building 3, 3rd Floor, Dubai Design District, Dubai, United Arab Emirates
Dhaka: Midas Center, Road 16, Dhanmondi, Dhaka 1209, Bangladesh
Thimphu: 
Bhutan Innovation Tech Center, Babesa, P.O. Box 633, Thimphu, Bhutan

Visit 
us: www.selise.ch 





-- 




*Important Note: This e-mail and any attachment are confidential and 
may contain trade secrets and may well also be legally privileged or 
otherwise protected from disclosure. If you have received it in error, you 
are on notice of its status. Please notify us immediately by reply e-mail 
and then delete this e-mail and any attachment from your system. If you are 
not the intended recipient please understand that you must not copy this 
e-mail or any attachment or disclose the contents to any other person. 
Thank you for your cooperation.*


Re:Re: flink sql connector options如何支持Map数据类型?

2022-12-19 Thread casel.chen
看过了,不支持http source table,而且即使http lookup table也不支持map数据类型

















在 2022-12-19 14:51:42,"Weihua Hu"  写道:
>Hi, 你可以尝试使用独立开源的 http connector
>
>https://github.com/getindata/flink-http-connector
>
>Best,
>Weihua
>
>
>On Sat, Dec 17, 2022 at 10:21 AM casel.chen  wrote:
>
>> 我想开发一个Flink SQL Http Connector,遇到用户添加自定义请求头Headers问题,如何在connector
>> options中支持Map数据类型呢?


Windowing query with group by produces update stream

2022-12-19 Thread Theodor Wübker
Hey everyone,

I would like to run a Windowing-SQL query with a group-by clause on a Kafka 
topic and write the result back to Kafka. Right now, the program always says 
that I am creating an update-stream that can only be written to an 
Upsert-Kafka-Sink. That seems odd to me, because running my grouping over a 
tumbling window should only require writing the result to kafka exactly once. 
Quote from docs [1]: 'Unlike other aggregations on continuous tables, window 
aggregation do not emit intermediate results but only a final result, the total 
aggregation at the end of the window'
I understand that ‘group-by’ should generate an update-stream as long as there 
is no windowing happening - but there is in my case. How can I get my program 
to not create an update-, but a simple append stream instead? My query looks 
roughly like this:

"SELECT x, window_start, count(*) as y 
FROM TABLE(TUMBLE(TABLE my_table, DESCRIPTOR(timestamp), INTERVAL '1' DAY))
GROUP BY x, window_start”

-Theo


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/

smime.p7s
Description: S/MIME cryptographic signature


RE: Re:Re: sql-client pyexec参数生效疑问

2022-12-19 Thread kung harold
设置-pyclientexec参数和sql client终端SET 'python.client.executable'='’; 都能解决` 
java.lang.IllegalStateException: Instantiating python function xx 
failed`的问题,但我接下来正常执行udf函数的时候依然报错,一直没有找到` Could not initialize class 
org.apache.beam.sdk.options.PipelineOptionsFactory `的原因是啥。
```
select func1('Chicago');
# console log
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.beam.sdk.options.PipelineOptionsFactory

#client log
org.apache.flink.table.client.gateway.SqlExecutionException: Error while 
retrieving result.
at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:79)
 ~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: java.lang.RuntimeException: Failed to fetch next result
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
 ~[?:?]
at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75)
 ~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: java.io.IOException: Failed to fetch job execution result
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
 ~[?:?]
at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75)
 ~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
b3dcd20267356a2280d09df4bcd6f440)
at java.util.concurrent.CompletableFuture.reportGet(Unknown Source) 
~[?:?]
at java.util.concurrent.CompletableFuture.get(Unknown Source) ~[?:?]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
 ~[?:?]
at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75)
 ~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: b3dcd20267356a2280d09df4bcd6f440)
at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130)
 ~[flink-dist-1.16.0.jar:1.16.0]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown 
Source) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
at java.util.concurrent.CompletableFuture.complete(Unknown Source) 
~[?:?]
at 
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301)
 ~[flink-dist-1.16.0.jar:1.16.0]
at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown 
Source) ~[?:?]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) 
~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
at java.util.concurrent.CompletableFuture.complete(Unknown Source) 
~[?:?]
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$31(RestClusterClient.java:772)
 ~[flink-dist-1.16.0.jar:1.16.0]
  

aws glue connector

2022-12-19 Thread Katz, David L via user
Hi-

Has anyone used a glue table connector (specifically trying to get a streaming 
glue table that sits on top of a kinesis data stream)?  I have been using the 
kinesis stream connector but want to integrate with lake formation so would 
like to take advantage of the glue table layer.

Thanks,
-Dave

David L Katz (he/him/his) 
why?
 | Executive Director | Macro Technology | 545 Washington Blvd, Floor 4 | 
Jersey City, NJ 07310


This message is confidential and subject to terms at: 
https://www.jpmorgan.com/emaildisclaimer including on confidential, privileged 
or legal entity information, malicious content and monitoring of electronic 
messages. If you are not the intended recipient, please delete this message and 
notify the sender immediately. Any unauthorized use is strictly prohibited.


Re: Reduce checkpoint-induced latency by tuning the amount of resource available for checkpoints

2022-12-19 Thread Robin Cassan via user
That's fantastic, thanks a lot for the info, I will definitely try that!

Cheers,

Robin

Le lun. 19 déc. 2022 à 13:16, Hangxiang Yu  a écrit :

> Hi, Rogin.
> If you have upgraded to 1.16, I think your problem will be
> solved automatically because the restore mode has been supported from 1.15.
> The NO_CLAIM mode is the default restore mode [1] which will help you to
> break the lineage of snapshots (both checkpoints and savepoints).
> When you use this mode, the first checkpoint will be full, but the
> following checkpoints will be incremental if only you enable the
> incremental checkpoint.
> Of course, you could also try to use CLAIM mode in which flink will help
> you to clean the old checkpoints as much as possible.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/#resuming-from-savepoints
>
> On Mon, Dec 19, 2022 at 5:28 PM Robin Cassan <
> robin.cas...@contentsquare.com> wrote:
>
>> Hey Hangxiang! Thanks a lot for your answer
>>
>> Indeed we are currently using Flink 1.13 and plan on moving to 1.16 soon,
>> so it's great news that the non-incremental checkpoints were optimized,
>> thanks for sharing! We decided to no use incremental checkpoints due to
>> the fact that it was hard to expire files on S3 since newer checkpoints
>> might need shared files from older ones (given our deployment process is
>> based on restoring the old job's checkpoint and not taking a savepoint).
>> That said, we will keep this idea in mind
>>
>> Do you however confirm that there is no way to isolate resources used for
>> checkpoints and achieve consistent latency? Our only option is to try
>> to reduce the checkpointing duration?
>>
>> Thanks again,
>> Robin
>>
>> Le ven. 16 déc. 2022 à 13:48, Hangxiang Yu  a
>> écrit :
>>
>>> Hi, Robin.
>>> From your code path (*FullSnapshotAsyncWriter.writeKVStateData*), I
>>> guess your version of Flink was below 1.16 and you adapted the default
>>> config of 'state.backend.incremental'.
>>> In the version below 1.16, RocksDBStateBackend will use savepoint format
>>> as its full snapshot[1]. So it will iterate all data in db and upload them
>>> which will cost lots of CPU when checkpointing.
>>>
>>> So I'd like to suggest you to:
>>> 1. set 'state.backend.incremental' as true to enable incremental
>>> checkpoint[2] to avoid the iterator cost and reduce the upload size.
>>> 2. upgrade to 1.16 to fix [1]. Of course, I think it's better to enable
>>> incremental checkpoint.
>>> 3. upgrade to 1.16 and use generic incremental checkpoint (changelog)
>>> [3] if you find the CPU is still not stable.
>>> The feature could help to make incremental checkpoint size small and
>>> stable which could make the CPU more stable.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-28699
>>> [2]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/fault-tolerance/checkpointing/#state-backend-incremental
>>> [3]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#state-changelog-options
>>>
>>> On Fri, Dec 16, 2022 at 1:03 AM Robin Cassan via user <
>>> user@flink.apache.org> wrote:
>>>
 Hello all!

 We are trying to bring our flink job closer to real-time processing and
 currently our main issue is latency that happens during checkpoints. Our
 job uses RocksDB with periodic checkpoints, which are a few hundred GBs
 every 15 minutes. We are trying to reduce the checkpointing duration but
 our main concern is the fact that, during checkpoints, 70% of our CPU is
 used for checkpointing (*FullSnapshotAsyncWriter.writeKVStateData*)

 Ideally, we would like to allocate a fixed amount of our CPU resources
 to this task (let's say 10%), which would allow the regular processing of
 data to remain stable while checkpointing. This comes at the expense of
 having 10% idle CPU in-between checkpoints and having longer checkpoint
 durations, but we are OK with this tradeoff if it brings more predictable
 latency overall.

 However, I didn't find any setting to achieve this. It seems like these
 checkpointing tasks are scheduled in the *asyncOperationsThreadPool* that
 resides in *StreamTask.java* and this pool seems to be unbounded.
 Do you think that having an upper bound to this thread pool would
 achieve the outcome we expect? And if so, is there a way to add this bound?

 Thanks a lot!

 Robin

>>>
>>>
>>> --
>>> Best,
>>> Hangxiang.
>>>
>>
>
> --
> Best,
> Hangxiang.
>


Re: Rocksdb Incremental checkpoint

2022-12-19 Thread Hangxiang Yu
Hi,
IIUC, numRetainedCheckpoints will only influence the space overhead of
checkpoint dir, but not the incremental size.
RocksDB executes incremental checkpoint based on the shard directory which
will always remain SST Files as much as possible (maybe it's from the last
checkpoint, or maybe from long long ago).
numRetainedCheckpoints just makes flink remain more cp-x directory and SST
Files in shared directory not used in the next incremental checkpoint.
Whether it's 1 or 3, the size of the incremental checkpoint should be
similar.

Could you check your configuration, source status, job status, etc again
to find whether there are any other differences ?

On Mon, Dec 19, 2022 at 9:00 PM Puneet Duggal 
wrote:

> Hi,
>
> After going through the following article regarding rocksdb incremental
> checkpoint (
> https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html),
> my understanding was that at each checkpoint, flink only checkpoints newly
> created SSTables whereas other it can reference from earlier checkpoints
> (depending upon num of retained checkpoints).
>
> So can we assume from this that if numRetainedCheckpoints = 1 (default),
> behaviour is similar as checkpointing comeplete data as it is (same as non
> incremental checkpointing).
>
> Also performed a load test by running exactly same flink job on 2
> different clusters. Only difference between all these clusters were
> numOfRetained checkpoints.
>
> Incremental Checkpoint Load Test
>
> Cluster 1
>
> num Retained Checkpoints = 3
>
>
> Cluster 2
>
> num Retained Checkpoints = 1
>
>
>
> As we can see, checkpoint data size for cluster with num of retained
> checkpoints = 1 is less than one with greater number of retained
> checkpoints.
>
>
>

-- 
Best,
Hangxiang.


Rocksdb Incremental checkpoint

2022-12-19 Thread Puneet Duggal
Hi,

After going through the following article regarding rocksdb incremental 
checkpoint 
(https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html), 
my understanding was that at each checkpoint, flink only checkpoints newly 
created SSTables whereas other it can reference from earlier checkpoints 
(depending upon num of retained checkpoints). 

So can we assume from this that if numRetainedCheckpoints = 1 (default), 
behaviour is similar as checkpointing comeplete data as it is (same as non 
incremental checkpointing).

Also performed a load test by running exactly same flink job on 2 different 
clusters. Only difference between all these clusters were numOfRetained 
checkpoints. 

Incremental Checkpoint Load Test

Cluster 1

num Retained Checkpoints = 3



Cluster 2

num Retained Checkpoints = 1




As we can see, checkpoint data size for cluster with num of retained 
checkpoints = 1 is less than one with greater number of retained checkpoints.




Re: Reduce checkpoint-induced latency by tuning the amount of resource available for checkpoints

2022-12-19 Thread Hangxiang Yu
Hi, Rogin.
If you have upgraded to 1.16, I think your problem will be
solved automatically because the restore mode has been supported from 1.15.
The NO_CLAIM mode is the default restore mode [1] which will help you to
break the lineage of snapshots (both checkpoints and savepoints).
When you use this mode, the first checkpoint will be full, but the
following checkpoints will be incremental if only you enable the
incremental checkpoint.
Of course, you could also try to use CLAIM mode in which flink will help
you to clean the old checkpoints as much as possible.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/#resuming-from-savepoints

On Mon, Dec 19, 2022 at 5:28 PM Robin Cassan 
wrote:

> Hey Hangxiang! Thanks a lot for your answer
>
> Indeed we are currently using Flink 1.13 and plan on moving to 1.16 soon,
> so it's great news that the non-incremental checkpoints were optimized,
> thanks for sharing! We decided to no use incremental checkpoints due to
> the fact that it was hard to expire files on S3 since newer checkpoints
> might need shared files from older ones (given our deployment process is
> based on restoring the old job's checkpoint and not taking a savepoint).
> That said, we will keep this idea in mind
>
> Do you however confirm that there is no way to isolate resources used for
> checkpoints and achieve consistent latency? Our only option is to try
> to reduce the checkpointing duration?
>
> Thanks again,
> Robin
>
> Le ven. 16 déc. 2022 à 13:48, Hangxiang Yu  a écrit :
>
>> Hi, Robin.
>> From your code path (*FullSnapshotAsyncWriter.writeKVStateData*), I
>> guess your version of Flink was below 1.16 and you adapted the default
>> config of 'state.backend.incremental'.
>> In the version below 1.16, RocksDBStateBackend will use savepoint format
>> as its full snapshot[1]. So it will iterate all data in db and upload them
>> which will cost lots of CPU when checkpointing.
>>
>> So I'd like to suggest you to:
>> 1. set 'state.backend.incremental' as true to enable incremental
>> checkpoint[2] to avoid the iterator cost and reduce the upload size.
>> 2. upgrade to 1.16 to fix [1]. Of course, I think it's better to enable
>> incremental checkpoint.
>> 3. upgrade to 1.16 and use generic incremental checkpoint (changelog) [3]
>> if you find the CPU is still not stable.
>> The feature could help to make incremental checkpoint size small and
>> stable which could make the CPU more stable.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-28699
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/fault-tolerance/checkpointing/#state-backend-incremental
>> [3]
>> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#state-changelog-options
>>
>> On Fri, Dec 16, 2022 at 1:03 AM Robin Cassan via user <
>> user@flink.apache.org> wrote:
>>
>>> Hello all!
>>>
>>> We are trying to bring our flink job closer to real-time processing and
>>> currently our main issue is latency that happens during checkpoints. Our
>>> job uses RocksDB with periodic checkpoints, which are a few hundred GBs
>>> every 15 minutes. We are trying to reduce the checkpointing duration but
>>> our main concern is the fact that, during checkpoints, 70% of our CPU is
>>> used for checkpointing (*FullSnapshotAsyncWriter.writeKVStateData*)
>>>
>>> Ideally, we would like to allocate a fixed amount of our CPU resources
>>> to this task (let's say 10%), which would allow the regular processing of
>>> data to remain stable while checkpointing. This comes at the expense of
>>> having 10% idle CPU in-between checkpoints and having longer checkpoint
>>> durations, but we are OK with this tradeoff if it brings more predictable
>>> latency overall.
>>>
>>> However, I didn't find any setting to achieve this. It seems like these
>>> checkpointing tasks are scheduled in the *asyncOperationsThreadPool* that
>>> resides in *StreamTask.java* and this pool seems to be unbounded.
>>> Do you think that having an upper bound to this thread pool would
>>> achieve the outcome we expect? And if so, is there a way to add this bound?
>>>
>>> Thanks a lot!
>>>
>>> Robin
>>>
>>
>>
>> --
>> Best,
>> Hangxiang.
>>
>

-- 
Best,
Hangxiang.


关于flink sql调用python udf失败的问题

2022-12-19 Thread kung harold
Flink使用官方docker-compose起的,python 
env用conda打的包(python3.7.12+apache-flink==1.16.0+apache-beam=2.38.0);
```bash
sql client 启动参数
# sql_client/task_manager/job_manager都挂载了对应目录,权限、所有者均为flink:flink
bin/sql-client.sh \ --pyExecutable 
/opt/flink_data/requirements/py_env/jm_env/bin/python3.7 \ -pyfs 
file:///opt/flink_data/requirements/udfs/

# 对应conda打包后的执行环境
bin/sql-client.sh \
--pyArchives 
file:///opt/flink_data/requirements/py_env/pyflink_jm_1.16.0_env.zip \
--pyExecutable pyflink_jm_1.16.0_env.zip/bin/python3.7 \
-pyfs file:///opt/flink_data/requirements/udfs/ -j 
/opt/flink/lib/flink-python-1.16.0.jar

# UDF函数
from pyflink.table import DataTypes
from pyflink.table.udf import udf


@udf(input_types=[DataTypes.STRING()],result_type=DataTypes.STRING())
def func1(line):
return "udf_{}".format(line)


# sql测试调用方式
CREATE TEMPORARY FUNCTION func1 AS 'to_fahr.to_fahr.func1' LANGUAGE PYTHON;
select func1('Chicago');
# console log
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.beam.sdk.options.PipelineOptionsFactory

#client log
org.apache.flink.table.client.gateway.SqlExecutionException: Error while 
retrieving result.
at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:79)
 ~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: java.lang.RuntimeException: Failed to fetch next result
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
 ~[?:?]
at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75)
 ~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: java.io.IOException: Failed to fetch job execution result
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
 ~[?:?]
at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75)
 ~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
b3dcd20267356a2280d09df4bcd6f440)
at java.util.concurrent.CompletableFuture.reportGet(Unknown Source) 
~[?:?]
at java.util.concurrent.CompletableFuture.get(Unknown Source) ~[?:?]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
 ~[?:?]
at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75)
 ~[flink-sql-client-1.16.0.jar:1.16.0]
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: b3dcd20267356a2280d09df4bcd6f440)
at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130)
 ~[flink-dist-1.16.0.jar:1.16.0]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown 
Source) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
at java.util.concurrent.CompletableFuture.complete(Unknown Source) 
~[?:?]
at 

Re: Reduce checkpoint-induced latency by tuning the amount of resource available for checkpoints

2022-12-19 Thread Robin Cassan via user
Hey Hangxiang! Thanks a lot for your answer

Indeed we are currently using Flink 1.13 and plan on moving to 1.16 soon,
so it's great news that the non-incremental checkpoints were optimized,
thanks for sharing! We decided to no use incremental checkpoints due to the
fact that it was hard to expire files on S3 since newer checkpoints
might need shared files from older ones (given our deployment process is
based on restoring the old job's checkpoint and not taking a savepoint).
That said, we will keep this idea in mind

Do you however confirm that there is no way to isolate resources used for
checkpoints and achieve consistent latency? Our only option is to try
to reduce the checkpointing duration?

Thanks again,
Robin

Le ven. 16 déc. 2022 à 13:48, Hangxiang Yu  a écrit :

> Hi, Robin.
> From your code path (*FullSnapshotAsyncWriter.writeKVStateData*), I guess
> your version of Flink was below 1.16 and you adapted the default config of
> 'state.backend.incremental'.
> In the version below 1.16, RocksDBStateBackend will use savepoint format
> as its full snapshot[1]. So it will iterate all data in db and upload them
> which will cost lots of CPU when checkpointing.
>
> So I'd like to suggest you to:
> 1. set 'state.backend.incremental' as true to enable incremental
> checkpoint[2] to avoid the iterator cost and reduce the upload size.
> 2. upgrade to 1.16 to fix [1]. Of course, I think it's better to enable
> incremental checkpoint.
> 3. upgrade to 1.16 and use generic incremental checkpoint (changelog) [3]
> if you find the CPU is still not stable.
> The feature could help to make incremental checkpoint size small and
> stable which could make the CPU more stable.
>
> [1] https://issues.apache.org/jira/browse/FLINK-28699
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/fault-tolerance/checkpointing/#state-backend-incremental
> [3]
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#state-changelog-options
>
> On Fri, Dec 16, 2022 at 1:03 AM Robin Cassan via user <
> user@flink.apache.org> wrote:
>
>> Hello all!
>>
>> We are trying to bring our flink job closer to real-time processing and
>> currently our main issue is latency that happens during checkpoints. Our
>> job uses RocksDB with periodic checkpoints, which are a few hundred GBs
>> every 15 minutes. We are trying to reduce the checkpointing duration but
>> our main concern is the fact that, during checkpoints, 70% of our CPU is
>> used for checkpointing (*FullSnapshotAsyncWriter.writeKVStateData*)
>>
>> Ideally, we would like to allocate a fixed amount of our CPU resources to
>> this task (let's say 10%), which would allow the regular processing of data
>> to remain stable while checkpointing. This comes at the expense of having
>> 10% idle CPU in-between checkpoints and having longer checkpoint durations,
>> but we are OK with this tradeoff if it brings more predictable latency
>> overall.
>>
>> However, I didn't find any setting to achieve this. It seems like these
>> checkpointing tasks are scheduled in the *asyncOperationsThreadPool* that
>> resides in *StreamTask.java* and this pool seems to be unbounded.
>> Do you think that having an upper bound to this thread pool would achieve
>> the outcome we expect? And if so, is there a way to add this bound?
>>
>> Thanks a lot!
>>
>> Robin
>>
>
>
> --
> Best,
> Hangxiang.
>


Flink reactive mode for application clusters on AWS EKS

2022-12-19 Thread Tamir Sagi
Hey,

We are running stream jobs on application clusters (v1.15.2) on AWS EKS.

I was reviewing the following pages on Flink confluence

  *   Reactive mode [1]
  *   Adaptive Scheduler [2]

I also encountered the following POC conducted by Robert Metzger 
(@rmetzger_) on 06 May 2021. [3]

my question is whether that feature will be supported in the future for 
application clusters or not.

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler
[3] https://flink.apache.org/2021/05/06/reactive-mode.html


Thanks,
Tamir.


Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.