退订

2023-07-17 Thread WD.Z
退订

flink如何正确使用mybatis

2023-07-17 Thread lxk
在flink内需要使用mybatis做些简化查询的工作,目前我的使用方式如下

public class MybatisUtil {

private static final Logger LOGGER = LogFactory.createNewLogger("MybatisUtil");
private static ThreadLocal tl = new ThreadLocal();
private static SqlSessionFactory factory = null;
//private static  SqlSession sqlSession = null;
static {
// 1 读取配置文件 config.xml
InputStream in = null;
try {
in = Resources.getResourceAsStream("batis.xml");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
throw new RuntimeException(e);
}
// 2 创建SqlSessionFactory
factory = new SqlSessionFactoryBuilder().build(in);
}



public static SqlSession getSqlSession(){
SqlSession sqlSession = tl.get();
if(sqlSession == null){
sqlSession = factory.openSession();
tl.set(sqlSession);
LOGGER.info("sqlSession创建成功,连接为:{},时间为:{}", sqlSession,LocalTimeUtil.now());
}
return sqlSession;
}


}
以上是工具类
我在open方法中获取sqlsession,然后在invoke方法中使用mapper
public void open(Configuration parameters) throws Exception {
sqlSession = MybatisUtil.getSqlSession();
}

public List map(HeaderFullWithPreOrder headerFullWithPreOrder) 
throws Exception {
SelectAddCartMapper mapper = sqlSession.getMapper(SelectAddCartMapper.class);
...其他省略
}

想问下这种方式使用是否正确。以及sqlsession是否需要关闭,看见相关帖子有说如果sqlsession不关闭的话会把连接打满


Re: Async IO For Cassandra

2023-07-17 Thread Shammon FY
Hi Pritam,

I'm sorry that I'm not familiar with Cassandra. If your async function is
always the root cause for backpressure, I think you can check the latency
for the async request in your function and log some metrics.

By the way, I think you can add cache in your async function to speedup the
lookup request which we always do in loopup join for sql jobs.


Best,
Shammon FY

On Mon, Jul 17, 2023 at 10:09 PM Pritam Agarwala <
pritamagarwala...@gmail.com> wrote:

> Hi Team,
>
>
> Any input on this will be really helpful.
>
>
> Thanks!
>
> On Tue, Jul 11, 2023 at 12:04 PM Pritam Agarwala <
> pritamagarwala...@gmail.com> wrote:
>
>> Hi Team,
>>
>>
>> I am using  "AsyncDataStream.unorderedWait" to connect to cassandra . The
>> cassandra lookup operators are becoming the busy operator and creating
>> back-pressure result low throughput.
>>
>>
>> The Cassandra lookup is a very simple query. So I increased the capacity
>> parameter to 80 from 15 and could see low busy % of cassandra operators.  I
>> am monitoring the cassandra open connections and connected host metrics.
>> Couldn't see any change on these metrics.
>>
>>
>> How is the capacity parameter related to cassandra open connections and
>> host ? If I increase capacity more will it have any impact on these metrics
>> ?
>>
>> Thanks & Regards,
>> Pritam
>>
>


Re: Checkpoint size smaller than Savepoint size

2023-07-17 Thread Shammon FY
Hi Neha,

I think you can first check whether the options `state.backend` and
`state.backend.incremental` you mentioned above exist in
`JobManager`->`Configuration` in Flink webui. If they do not exist, you may
be using the wrong conf file.

Best,
Shammon FY


On Mon, Jul 17, 2023 at 5:04 PM Neha .  wrote:

> Hi Shammon,
>
> state.backend: rocksdb
> state.backend.incremental: true
>
> This is already set in the Flink-conf. Anything else that should be taken
> care of for the incremental checkpointing? Is there any related bug in
> Flink 1.16.1? we have recently migrated to Flink 1.16.1 from Flink 1.13.6.
> What can be the reason for stopped incremental checkpointing?
>
> On Mon, Jul 17, 2023 at 11:35 AM Shammon FY  wrote:
>
>> Hi Neha,
>>
>> I noticed that the `Checkpointed Data Size` is always equals to `Full
>> Checkpoint Data Size`, I think the job is using full checkpoint instead of
>> incremental checkpoint,  you can check it
>>
>> Best,
>> Shammon FY
>>
>> On Mon, Jul 17, 2023 at 10:25 AM Neha .  wrote:
>>
>>> Hello Shammon,
>>>
>>> Thank you for your assistance.
>>> I have already enabled the incremental checkpointing, Attaching the
>>> screenshot. Can you please elaborate on what makes you think it is not
>>> enabled, It might hint towards the issue. The problem is checkpoint size is
>>> not going down and keeps on increasing while savepoint size shows the
>>> correct behavior of going up and down with the throughput peaks.
>>>
>>> [image: Screenshot 2023-07-17 at 7.49.19 AM.png]
>>>
>>>
>>> On Mon, Jul 17, 2023 at 6:28 AM Shammon FY  wrote:
>>>
 Hi Neha,

 I think it is normal for the data size of a savepoint to be smaller
 than the full data of a checkpoint. Flink uses rocksdb to store
 checkpointed data, which is an LSM structured storage where the same key
 will have multiple version records, while savepoint will traverse all keys
 and store only one record per key.

 But I noticed that you did not enable incremental checkpoint, which
 resulted in each checkpoint saving full data. You can refer to [1] for more
 detail and turn it on, which will reduce the data size of the checkpoint.

 [1]
 https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#incremental-checkpoints
 

 Best,
 Shammon FY


 On Sun, Jul 16, 2023 at 2:30 PM Neha .  wrote:

> Hello  Shammon FY,
>
> It is a production issue for me. Can you please take a look if
> anything can be done?
>
> -- Forwarded message -
> From: Neha . 
> Date: Fri, Jul 14, 2023 at 4:06 PM
> Subject: Checkpoint size smaller than Savepoint size
> To: 
>
>
> Hello,
>
> According to Flink's documentation, Checkpoints are designed to be
> lightweight. However, in my Flink pipeline, I have observed that the
> savepoint sizes are smaller than the checkpoints. Is this expected
> behavior? What are the possible scenarios that can lead to this situation?
>
> Additionally, I have noticed that the checkpoint size in my datastream
> pipeline continues to grow while the savepoint size behaves as expected.
> Could this be attributed to the usage of Common Table Expressions (CTEs) 
> in
> Flink SQL?
>
> Flink version: 1.16.1
> Incremental checkpointing is enabled.
> StateBackend: RocksDB
> Time Characteristic: Ingestion
>
> SQL:
>
> SELECT
>   *
> from
>   (
> With Actuals as (
>   SELECT
> clientOrderId,
> Cast(
>   ValueFromKeyCacheUDF(
> concat('R_', CAST(order_df.restaurant_id AS VARCHAR))
>   ) as INT
> ) as zoneId,
> cityId,
> case
>   when status = 'ASSIGNED' then 1
>   else 0
> end as acceptance_flag,
> unicast.proctime
>   FROM
> order
> INNER JOIN unicast_df ON unicast.clientOrderId = order.order_id
> AND order.proctime BETWEEN unicast.proctime - interval '70'
> minute
> AND unicast.proctime + interval '10' minute
> and unicast.status in ('ASSIGNED', 'REJECTED')
> ),
> zone_agg as (
>   select
> zoneId,
> (sum(acceptance_flag) * 1.0) / count(*) as `zone_quotient`,
> avg(cityId) as cityId,
> COUNT(*) as `unicast_count`,
> proctime() as proctime
>   from
> Actuals
>   group by
> HOP(
>   proctime(),
>   interval '5' minute,
>   interval '30' minute
> ),

Unsubscribe

2023-07-17 Thread wang
Unsubscribe

Re: Async IO For Cassandra

2023-07-17 Thread Pritam Agarwala
Hi Team,


Any input on this will be really helpful.


Thanks!

On Tue, Jul 11, 2023 at 12:04 PM Pritam Agarwala <
pritamagarwala...@gmail.com> wrote:

> Hi Team,
>
>
> I am using  "AsyncDataStream.unorderedWait" to connect to cassandra . The
> cassandra lookup operators are becoming the busy operator and creating
> back-pressure result low throughput.
>
>
> The Cassandra lookup is a very simple query. So I increased the capacity
> parameter to 80 from 15 and could see low busy % of cassandra operators.  I
> am monitoring the cassandra open connections and connected host metrics.
> Couldn't see any change on these metrics.
>
>
> How is the capacity parameter related to cassandra open connections and
> host ? If I increase capacity more will it have any impact on these metrics
> ?
>
> Thanks & Regards,
> Pritam
>


Re: Checkpoint size smaller than Savepoint size

2023-07-17 Thread Neha . via user
Hi Shammon,

state.backend: rocksdb
state.backend.incremental: true

This is already set in the Flink-conf. Anything else that should be taken
care of for the incremental checkpointing? Is there any related bug in
Flink 1.16.1? we have recently migrated to Flink 1.16.1 from Flink 1.13.6.
What can be the reason for stopped incremental checkpointing?

On Mon, Jul 17, 2023 at 11:35 AM Shammon FY  wrote:

> Hi Neha,
>
> I noticed that the `Checkpointed Data Size` is always equals to `Full
> Checkpoint Data Size`, I think the job is using full checkpoint instead of
> incremental checkpoint,  you can check it
>
> Best,
> Shammon FY
>
> On Mon, Jul 17, 2023 at 10:25 AM Neha .  wrote:
>
>> Hello Shammon,
>>
>> Thank you for your assistance.
>> I have already enabled the incremental checkpointing, Attaching the
>> screenshot. Can you please elaborate on what makes you think it is not
>> enabled, It might hint towards the issue. The problem is checkpoint size is
>> not going down and keeps on increasing while savepoint size shows the
>> correct behavior of going up and down with the throughput peaks.
>>
>> [image: Screenshot 2023-07-17 at 7.49.19 AM.png]
>>
>>
>> On Mon, Jul 17, 2023 at 6:28 AM Shammon FY  wrote:
>>
>>> Hi Neha,
>>>
>>> I think it is normal for the data size of a savepoint to be smaller than
>>> the full data of a checkpoint. Flink uses rocksdb to store checkpointed
>>> data, which is an LSM structured storage where the same key will have
>>> multiple version records, while savepoint will traverse all keys and store
>>> only one record per key.
>>>
>>> But I noticed that you did not enable incremental checkpoint, which
>>> resulted in each checkpoint saving full data. You can refer to [1] for more
>>> detail and turn it on, which will reduce the data size of the checkpoint.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#incremental-checkpoints
>>> 
>>>
>>> Best,
>>> Shammon FY
>>>
>>>
>>> On Sun, Jul 16, 2023 at 2:30 PM Neha .  wrote:
>>>
 Hello  Shammon FY,

 It is a production issue for me. Can you please take a look if anything
 can be done?

 -- Forwarded message -
 From: Neha . 
 Date: Fri, Jul 14, 2023 at 4:06 PM
 Subject: Checkpoint size smaller than Savepoint size
 To: 


 Hello,

 According to Flink's documentation, Checkpoints are designed to be
 lightweight. However, in my Flink pipeline, I have observed that the
 savepoint sizes are smaller than the checkpoints. Is this expected
 behavior? What are the possible scenarios that can lead to this situation?

 Additionally, I have noticed that the checkpoint size in my datastream
 pipeline continues to grow while the savepoint size behaves as expected.
 Could this be attributed to the usage of Common Table Expressions (CTEs) in
 Flink SQL?

 Flink version: 1.16.1
 Incremental checkpointing is enabled.
 StateBackend: RocksDB
 Time Characteristic: Ingestion

 SQL:

 SELECT
   *
 from
   (
 With Actuals as (
   SELECT
 clientOrderId,
 Cast(
   ValueFromKeyCacheUDF(
 concat('R_', CAST(order_df.restaurant_id AS VARCHAR))
   ) as INT
 ) as zoneId,
 cityId,
 case
   when status = 'ASSIGNED' then 1
   else 0
 end as acceptance_flag,
 unicast.proctime
   FROM
 order
 INNER JOIN unicast_df ON unicast.clientOrderId = order.order_id
 AND order.proctime BETWEEN unicast.proctime - interval '70'
 minute
 AND unicast.proctime + interval '10' minute
 and unicast.status in ('ASSIGNED', 'REJECTED')
 ),
 zone_agg as (
   select
 zoneId,
 (sum(acceptance_flag) * 1.0) / count(*) as `zone_quotient`,
 avg(cityId) as cityId,
 COUNT(*) as `unicast_count`,
 proctime() as proctime
   from
 Actuals
   group by
 HOP(
   proctime(),
   interval '5' minute,
   interval '30' minute
 ),
 zoneId
 ),
 city_agg as(
   select
 cityId,
 sum(acceptance_flag) * 1.0 / count(*) as `city_quotient`,
 proctime() as proctime
   from
 Actuals
   group by
 HOP(
   proctime(),
   interval '5' minute,
   interval '30' minute
 ),
 cityId
 ),
 final as (
   

Re: TCP Socket stream scalability

2023-07-17 Thread Flavio Pompermaier
I had a similar situation with my Elasticsearch source where you don't know
before executing the query (via scroll API for example) how many splits you
will find.
How should you handle those situation with new Source API?

On Mon, Jul 17, 2023 at 10:09 AM Martijn Visser 
wrote:

> Hi Kamal,
>
> It would require you to find a way to create a TCP connection on task
> managers where you would only read the assigned part of the TCP connection.
> Looking at the protocol itself, that most likely would be an issue. A TCP
> connection would also be problematic in case of replays and checkpoint
> integration, since you can't roll back to the previous messages of the TCP
> connection.
>
> Best regards,
>
> Martijn
>
> On Sat, Jul 15, 2023 at 7:20 AM Kamal Mittal via user <
> user@flink.apache.org> wrote:
>
>> Hello Community,
>>
>>
>>
>> Please share views for below mail.
>>
>>
>>
>> Rgds,
>>
>> Kamal
>>
>>
>>
>> *From:* Kamal Mittal via user 
>> *Sent:* 14 July 2023 12:55 PM
>> *To:* user@flink.apache.org
>> *Subject:* TCP Socket stream scalability
>>
>>
>>
>> Hello,
>>
>>
>>
>> TCP Socket stream can be scaled across task managers similarly to file
>> enumerator and source reader below?
>>
>>
>>
>> Job is submitted with TCP socket source function and a socket will bind
>> on a port once and by a task manager. Is it possible to open socket at job
>> manager and then scale / divide the work among task managers similarly to
>> below?
>>
>>
>>
>>
>>
>> Rgds,
>>
>> Kamal
>>
>


Re: PyFlink SQL from Kafka to Iceberg issues

2023-07-17 Thread Martijn Visser
Hi Dani,

Plugins need to be placed in a folder inside the plugins directory, I think
that might be the problem.

Best regards,

Martijn

On Sun, Jul 9, 2023 at 7:00 PM Dániel Pálma  wrote:

> Thanks for the tips Martijn!
>
> I've fixed the library versions to 1.16 everywhere and also decided to
> scrap pyflink and go for the sql-client instead to keep things simpler for
> now.
>
> This is the Dockerfile I am using for both the *jobmanager* and the
> *sql-client*
>
> FROM flink:1.16.2-scala_2.12-java11
>
> RUN APACHE_HADOOP_URL=https://archive.apache.org/dist/hadoop/ \
> && HADOOP_VERSION=3.3.5 \
> && wget 
> ${APACHE_HADOOP_URL}/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz
> \
> && tar xzvf hadoop-${HADOOP_VERSION}.tar.gz \
> && HADOOP_HOME=`pwd`/hadoop-${HADOOP_VERSION}
>
> ENV HADOOP_CLASSPATH /opt/flink/hadoop-3.3.5/etc/hadoop:/opt/flink/
> hadoop-3.3.5/share/hadoop/common/lib/*:/opt/flink/hadoop-3.3.5/share/
> hadoop/common/*:/opt/flink/hadoop-3.3.5/share/hadoop/hdfs:/opt/flink/
> hadoop-3.3.5/share/hadoop/hdfs/lib/*:/opt/flink/hadoop-3.3.5/share/hadoop/
> hdfs/*:/opt/flink/hadoop-3.3.5/share/hadoop/mapreduce/*:/opt/flink/
> hadoop-3.3.5/share/hadoop/yarn:/opt/flink/hadoop-3.3.5/share/hadoop/yarn/
> lib/*:/opt/flink/hadoop-3.3.5/share/hadoop/yarn/*
>
> COPY lib/flink-json-1.16.1.jar /opt/flink/lib/
> COPY lib/flink-sql-connector-hive-3.1.2_2.12-1.16.2.jar /opt/flink/lib/
> COPY lib/flink-sql-connector-kafka-1.16.2.jar /opt/flink/lib/
> COPY lib/iceberg-flink-runtime-1.16-1.3.0.jar /opt/flink/lib/
> COPY lib/iceberg-hive-runtime-1.3.0.jar /opt/flink/lib/
> COPY lib/hive-metastore-3.1.3.jar /opt/flink/lib/
> COPY lib/hadoop-aws-3.3.5.jar /opt/flink/lib/
> COPY lib/aws-java-sdk-bundle-1.12.316.jar /opt/flink/lib/
>
> COPY lib/flink-s3-fs-hadoop-1.16.1.jar /opt/flink/plugins/
>
> WORKDIR /opt/flink
>
> I start the sql-client via */opt/flink/bin/sql-client.sh embedded*
>
> I am able to create a sink table with the iceberg connector using the
> following query:
>
> create table if not exists clicks_ib
> (
> `timestamp` STRING,
> event STRING,
> user_id STRING,
> site_id STRING,
> url STRING,
> on_site_seconds INT,
> viewed_percent INT
> )
> with ( 'connector'='iceberg',
> 'catalog-name'='hive_catalog',
> 'uri'='thrift://hivemetastore:9083',
> 'warehouse'='s3a://iceberg');
>
> But when I try to select from it, I run into the following error:
>
> *Flink SQL> select * from default_catalog.default_database.clicks_ib;*
>
>
> *[ERROR] Could not execute SQL statement.
> Reason:org.apache.hadoop.hive.metastore.api.MetaException:
> java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
> org.apache.hadoop.fs.s3a.S3AFileSystem not found*
>
> I feel like there is still a little confusion in me on where to place what
> jars, but not exactly sure what is missing.
>
> For reference, I'll paste the current full docker-compose.yml below.
>
> version: "3.7"
> services:
>
> sqlclient:
> container_name: sqlclient
> build: .
> command:
> - /opt/flink/bin/sql-client.sh
> - embedded
> depends_on:
> - jobmanager
> environment:
> - |
> FLINK_PROPERTIES=
> jobmanager.rpc.address: jobmanager
> rest.address: jobmanager
> volumes:
> - ./flink-sql:/etc/sql
>
> jobmanager:
> build: .
> hostname: "jobmanager"
> container_name: "jobmanager"
> expose:
> - "6123"
> ports:
> - "8081:8081"
> command: jobmanager
> environment:
> - JOB_MANAGER_RPC_ADDRESS=jobmanager
> - AWS_ACCESS_KEY_ID=minio
> - AWS_SECRET_ACCESS_KEY=minio123
> - AWS_REGION=us-east-1
>
> taskmanager:
> image: flink:1.16.2-scala_2.12-java11
> hostname: "taskmanager"
> container_name: "taskmanager"
> expose:
> - "6121"
> - "6122"
> depends_on:
> - jobmanager
> command: taskmanager
> links:
> - jobmanager:jobmanager
> environment:
> - JOB_MANAGER_RPC_ADDRESS=jobmanager
> - AWS_ACCESS_KEY_ID=minio
> - AWS_SECRET_ACCESS_KEY=minio123
> - AWS_REGION=us-east-1
>
> mariadb:
> image: 'mariadb:latest'
> hostname: mariadb
> container_name: mariadb
> ports:
> - '3306:3306'
> environment:
> MYSQL_ROOT_PASSWORD: admin
> MYSQL_USER: admin
> MYSQL_PASSWORD: admin
> MYSQL_DATABASE: metastore_db
> volumes:
> - ./mariadb-data:/var/lib/mysql
>
> hivemetastore:
> hostname: hivemetastore
> container_name: hivemetastore
> build:
> context: hive
> ports:
> - '9083:9083'
> environment:
> METASTORE_DB_HOSTNAME: mariadb
> depends_on:
> - mariadb
>
> minio:
> hostname: "minio"
> image: "minio/minio:latest"
> container_name: "minio"
> ports:
> - "9001:9001"
> - "9000:9000"
> command:
> - "server"
> - "/data"
> - "--console-address"
> - ":9001"
> volumes:
> - "minio:/data"
> environment:
> MINIO_ROOT_USER: "minio"
> MINIO_ROOT_PASSWORD: "minio123"
> networks:
> default:
> aliases:
> - iceberg.minio
>
> mc:
> depends_on:
> - "minio"
> image: "minio/mc"
> container_name: "mc"
> entrypoint: >
> /bin/sh -c "
> until (/usr/bin/mc config host add minio http://minio:9000 minio
> minio123) do echo "...waiting..." && sleep 1; done;
> /usr/bin/mc rm -r --force minio/iceberg;
> 

Re: Hadoop Error on ECS Fargate

2023-07-17 Thread Martijn Visser
Hi Mengxi Wang,

Which Flink version are you using?

Best regards,

Martijn

On Thu, Jul 13, 2023 at 3:21 PM Wang, Mengxi X via user <
user@flink.apache.org> wrote:

> Hi community,
>
>
>
> We got this kuerberos error with Hadoop as file system on ECS Fargate
> deployment.
>
>
>
> Caused by: org.apache.hadoop.security.KerberosAuthException: failure to
> login: javax.security.auth.login.LoginException:
> java.lang.NullPointerException: invalid null input: name
>
>
>
> Caused by: javax.security.auth.login.LoginException:
> java.lang.NullPointerException: invalid null input: name
>
>
>
> We don’t actually need Kerberos authentication so I’ve added properties to
> disable Hadoop Kerberos authentication to flink-config.yaml and I can see
> from logs they’ve been picked up. But still the errors persist. Can anybody
> help please?
>
>
>
> Best wishes,
>
> Mengxi Wang
>
>
>
> This message is confidential and subject to terms at:
> https://www.jpmorgan.com/emaildisclaimer including on confidential,
> privileged or legal entity information, malicious content and monitoring of
> electronic messages. If you are not the intended recipient, please delete
> this message and notify the sender immediately. Any unauthorized use is
> strictly prohibited.
>


Re: TCP Socket stream scalability

2023-07-17 Thread Martijn Visser
Hi Kamal,

It would require you to find a way to create a TCP connection on task
managers where you would only read the assigned part of the TCP connection.
Looking at the protocol itself, that most likely would be an issue. A TCP
connection would also be problematic in case of replays and checkpoint
integration, since you can't roll back to the previous messages of the TCP
connection.

Best regards,

Martijn

On Sat, Jul 15, 2023 at 7:20 AM Kamal Mittal via user 
wrote:

> Hello Community,
>
>
>
> Please share views for below mail.
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Kamal Mittal via user 
> *Sent:* 14 July 2023 12:55 PM
> *To:* user@flink.apache.org
> *Subject:* TCP Socket stream scalability
>
>
>
> Hello,
>
>
>
> TCP Socket stream can be scaled across task managers similarly to file
> enumerator and source reader below?
>
>
>
> Job is submitted with TCP socket source function and a socket will bind on
> a port once and by a task manager. Is it possible to open socket at job
> manager and then scale / divide the work among task managers similarly to
> below?
>
>
>
>
>
> Rgds,
>
> Kamal
>


Unsubscribe

2023-07-17 Thread William Wang



Re: Checkpoint size smaller than Savepoint size

2023-07-17 Thread Shammon FY
Hi Neha,

I noticed that the `Checkpointed Data Size` is always equals to `Full
Checkpoint Data Size`, I think the job is using full checkpoint instead of
incremental checkpoint,  you can check it

Best,
Shammon FY

On Mon, Jul 17, 2023 at 10:25 AM Neha .  wrote:

> Hello Shammon,
>
> Thank you for your assistance.
> I have already enabled the incremental checkpointing, Attaching the
> screenshot. Can you please elaborate on what makes you think it is not
> enabled, It might hint towards the issue. The problem is checkpoint size is
> not going down and keeps on increasing while savepoint size shows the
> correct behavior of going up and down with the throughput peaks.
>
> [image: Screenshot 2023-07-17 at 7.49.19 AM.png]
>
>
> On Mon, Jul 17, 2023 at 6:28 AM Shammon FY  wrote:
>
>> Hi Neha,
>>
>> I think it is normal for the data size of a savepoint to be smaller than
>> the full data of a checkpoint. Flink uses rocksdb to store checkpointed
>> data, which is an LSM structured storage where the same key will have
>> multiple version records, while savepoint will traverse all keys and store
>> only one record per key.
>>
>> But I noticed that you did not enable incremental checkpoint, which
>> resulted in each checkpoint saving full data. You can refer to [1] for more
>> detail and turn it on, which will reduce the data size of the checkpoint.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#incremental-checkpoints
>> 
>>
>> Best,
>> Shammon FY
>>
>>
>> On Sun, Jul 16, 2023 at 2:30 PM Neha .  wrote:
>>
>>> Hello  Shammon FY,
>>>
>>> It is a production issue for me. Can you please take a look if anything
>>> can be done?
>>>
>>> -- Forwarded message -
>>> From: Neha . 
>>> Date: Fri, Jul 14, 2023 at 4:06 PM
>>> Subject: Checkpoint size smaller than Savepoint size
>>> To: 
>>>
>>>
>>> Hello,
>>>
>>> According to Flink's documentation, Checkpoints are designed to be
>>> lightweight. However, in my Flink pipeline, I have observed that the
>>> savepoint sizes are smaller than the checkpoints. Is this expected
>>> behavior? What are the possible scenarios that can lead to this situation?
>>>
>>> Additionally, I have noticed that the checkpoint size in my datastream
>>> pipeline continues to grow while the savepoint size behaves as expected.
>>> Could this be attributed to the usage of Common Table Expressions (CTEs) in
>>> Flink SQL?
>>>
>>> Flink version: 1.16.1
>>> Incremental checkpointing is enabled.
>>> StateBackend: RocksDB
>>> Time Characteristic: Ingestion
>>>
>>> SQL:
>>>
>>> SELECT
>>>   *
>>> from
>>>   (
>>> With Actuals as (
>>>   SELECT
>>> clientOrderId,
>>> Cast(
>>>   ValueFromKeyCacheUDF(
>>> concat('R_', CAST(order_df.restaurant_id AS VARCHAR))
>>>   ) as INT
>>> ) as zoneId,
>>> cityId,
>>> case
>>>   when status = 'ASSIGNED' then 1
>>>   else 0
>>> end as acceptance_flag,
>>> unicast.proctime
>>>   FROM
>>> order
>>> INNER JOIN unicast_df ON unicast.clientOrderId = order.order_id
>>> AND order.proctime BETWEEN unicast.proctime - interval '70'
>>> minute
>>> AND unicast.proctime + interval '10' minute
>>> and unicast.status in ('ASSIGNED', 'REJECTED')
>>> ),
>>> zone_agg as (
>>>   select
>>> zoneId,
>>> (sum(acceptance_flag) * 1.0) / count(*) as `zone_quotient`,
>>> avg(cityId) as cityId,
>>> COUNT(*) as `unicast_count`,
>>> proctime() as proctime
>>>   from
>>> Actuals
>>>   group by
>>> HOP(
>>>   proctime(),
>>>   interval '5' minute,
>>>   interval '30' minute
>>> ),
>>> zoneId
>>> ),
>>> city_agg as(
>>>   select
>>> cityId,
>>> sum(acceptance_flag) * 1.0 / count(*) as `city_quotient`,
>>> proctime() as proctime
>>>   from
>>> Actuals
>>>   group by
>>> HOP(
>>>   proctime(),
>>>   interval '5' minute,
>>>   interval '30' minute
>>> ),
>>> cityId
>>> ),
>>> final as (
>>>   select
>>> zone_agg.zoneId,
>>> zone_agg.cityId,
>>> avg(zone_agg.unicast_count) as unicast_count,
>>> avg(zone_agg.zone_quotient) as zone_quotient,
>>> avg(city_agg.city_quotient) as city_quotient
>>>   from
>>> city_agg
>>> INNER join zone_agg on zone_agg.cityId = city_agg.cityId
>>> AND city_agg.proctime BETWEEN zone_agg.proctime - interval '60'
>>> minute
>>> AND zone_agg.proctime
>>>   group by
>>> HOP(
>>>   proctime(),
>>>   interval '5'