Re: How to resume a job from checkpoint with the SQL gateway.

2023-07-18 Thread Xiaolong Wang
Thank you for the information.  That solution works for me !

On Tue, Jul 18, 2023 at 3:00 PM Shammon FY  wrote:

> Hi Xiaolong,
>
> For new versions such as flink-1.17, flink sql-gateway supports job
> management and user can stop/start jobs with savepoint. You can start a job
> with a given savepoint path as [1] and stop a job with or without savepoint
> as [2].
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#start-a-sql-job-from-a-savepoint
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#terminating-a-job
>
> Best,
> Shammon FY
>
>
> On Tue, Jul 18, 2023 at 9:56 AM Xiaolong Wang 
> wrote:
>
>> Hi, Shammon,
>>
>> I know that the job manager can auto-recover via HA configurations, but
>> what if I want to upgrade the running Flink SQL submitted by the Flink SQL
>> gateway ?
>>
>> In normal cases, I can use the
>>
>>> ./flink run application -s ${SAVEPOINT_PATH} local://${FLINK_JOB_JAR}
>>
>> to resume a Flink job from a savepoint/checkpoint. The question is, how
>> to do so with Flink sql gateway ?  What should I fill in the
>> ${FLINK_JOB_JAR} field ?
>>
>> Thanks in advanced.
>>
>> On Mon, Jul 17, 2023 at 9:14 AM Shammon FY  wrote:
>>
>>> Hi Xiaolong,
>>>
>>> When a streaming job is submitted via Sql-Gateway, its lifecycle is no
>>> longer related to Sql Gateway.
>>>
>>> Returning to the issue of job recovery, I think if your job cluster is
>>> configured with HA, jobmanager will recover running streaming jobs from
>>> their checkpoints after a failover occurs.
>>>
>>> Best,
>>> Shammon FY
>>>
>>>
>>> On Thu, Jul 13, 2023 at 10:22 AM Xiaolong Wang <
>>> xiaolong.w...@smartnews.com> wrote:
>>>
 Hi,

 I'm currently working on providing a SQL gateway to submit both
 streaming and batch queries.

 My question is, if a streaming SQL is submitted and then the jobmanager
 crashes, is it possible to resume the streaming SQL from the latest
 checkpoint with the SQL gateway ?



>>>


Flink 1.15.1 issue we need help on

2023-07-18 Thread Tucker Harvey via user
Hi, we are trying to determine how fix the following exception. This is an 
issue that is repeatedly happening for us. We have tried looking online for 
some solutions.  One thread suggested setting idleTimeout but this doesn’t seem 
supported in Flink Source code. 

https://github.com/netty/netty/issues/8801

org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: 
readAddress(..) failed: Connection timed out (connection to 
[redacted-worker])]')
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:175)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1377)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:907)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:728)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:821)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: 
readAddress(..) failed: Connection timed out 

Re: 建议Flink ROWKIND做成元数据metadata

2023-07-18 Thread Feng Jin
Hi casel

之前有类似的讨论, 不过暴露 ROWKIND 之后可能可以会造成 SQL 语义上的不明确,你可以在 dev 邮件在发起讨论看看,看看大家的想法。

https://issues.apache.org/jira/browse/FLINK-24547

Best,
Feng

On Wed, Jul 19, 2023 at 12:06 AM casel.chen  wrote:

> 社区无人响应吗?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-07-15 12:19:46,"casel.chen"  写道:
> >Flink社区能否考虑将ROWKIND做成元数据metadata,类似于kafka
> connector中的offset和partition等,用户可以使用这些ROWKIND
> metadata进行处理,例如对特定ROWKIND过滤或者答输出数据中添加ROWKIND字段
>


Re: 建议Flink ROWKIND做成元数据metadata

2023-07-18 Thread casel.chen
社区无人响应吗?

















在 2023-07-15 12:19:46,"casel.chen"  写道:
>Flink社区能否考虑将ROWKIND做成元数据metadata,类似于kafka 
>connector中的offset和partition等,用户可以使用这些ROWKIND 
>metadata进行处理,例如对特定ROWKIND过滤或者答输出数据中添加ROWKIND字段


Keying a data stream by tenant and performing ML on each sub-stream - help

2023-07-18 Thread Catalin Stavaru
Hello everyone,

Here is my use case: I have an event data stream which I need to key by a
certain field (tenant id) and then for each tenant's events I need to
independently perform ML clustering using FlinkML's OnlineKMeans component.
I am using Java.

I tried different approaches but none of them seems to be correct.

Basically, I try to keep an OnlineKMeansModel instance as per-tenant state
using a keyed processing function on the input DataStream. In the
processing function for the current event, if I cannot retrieve the
OnlineKMeansModel instance for the event's tenant id, I will create one and
store it as state for that tenant id, to use it in the future.

However, this doesn't seem to be the correct way to do it in Flink, I am
facing many hurdles using this approach.

- The OnlineKMeans takes a table (as in Table API) as input;  that table is
basically a view of the event data stream, filtered by a certain tenant id.
How do I go about this ?
- The OnlineKMeansModel is provided a table to output its predictions to.
How do I go about this table ?
- I get many "this class is not serializable" errors, a sign that I am not
using the correct approach.

etc.

In essence, I feel that I am overlooking a fundamental aspect when it comes
to implementing a functional approach for performing FlinkML computations
independently for each key within a keyed data stream.

In the hope that my use case was understood, I am asking you for help on
the correct approach for this scenario.

Thank you !

-- 
Catalin Stavaru


Re: Checkpoint size smaller than Savepoint size

2023-07-18 Thread Neha . via user
Hi Shammon,

These configs exist in Flink WebUI. We have set
exeEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); Do
you think it can create some issues for the HOP(proctime, some interval,
some interval) and not releasing the state for checkpoints?
I am really confused about why savepoints are working fine and not
checkpoints.

On Tue, Jul 18, 2023 at 6:56 AM Shammon FY  wrote:

> Hi Neha,
>
> I think you can first check whether the options `state.backend` and
> `state.backend.incremental` you mentioned above exist in
> `JobManager`->`Configuration` in Flink webui. If they do not exist, you may
> be using the wrong conf file.
>
> Best,
> Shammon FY
>
>
> On Mon, Jul 17, 2023 at 5:04 PM Neha .  wrote:
>
>> Hi Shammon,
>>
>> state.backend: rocksdb
>> state.backend.incremental: true
>>
>> This is already set in the Flink-conf. Anything else that should be taken
>> care of for the incremental checkpointing? Is there any related bug in
>> Flink 1.16.1? we have recently migrated to Flink 1.16.1 from Flink 1.13.6.
>> What can be the reason for stopped incremental checkpointing?
>>
>> On Mon, Jul 17, 2023 at 11:35 AM Shammon FY  wrote:
>>
>>> Hi Neha,
>>>
>>> I noticed that the `Checkpointed Data Size` is always equals to `Full
>>> Checkpoint Data Size`, I think the job is using full checkpoint instead of
>>> incremental checkpoint,  you can check it
>>>
>>> Best,
>>> Shammon FY
>>>
>>> On Mon, Jul 17, 2023 at 10:25 AM Neha .  wrote:
>>>
 Hello Shammon,

 Thank you for your assistance.
 I have already enabled the incremental checkpointing, Attaching the
 screenshot. Can you please elaborate on what makes you think it is not
 enabled, It might hint towards the issue. The problem is checkpoint size is
 not going down and keeps on increasing while savepoint size shows the
 correct behavior of going up and down with the throughput peaks.

 [image: Screenshot 2023-07-17 at 7.49.19 AM.png]


 On Mon, Jul 17, 2023 at 6:28 AM Shammon FY  wrote:

> Hi Neha,
>
> I think it is normal for the data size of a savepoint to be smaller
> than the full data of a checkpoint. Flink uses rocksdb to store
> checkpointed data, which is an LSM structured storage where the same key
> will have multiple version records, while savepoint will traverse all keys
> and store only one record per key.
>
> But I noticed that you did not enable incremental checkpoint, which
> resulted in each checkpoint saving full data. You can refer to [1] for 
> more
> detail and turn it on, which will reduce the data size of the checkpoint.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#incremental-checkpoints
> 
>
> Best,
> Shammon FY
>
>
> On Sun, Jul 16, 2023 at 2:30 PM Neha .  wrote:
>
>> Hello  Shammon FY,
>>
>> It is a production issue for me. Can you please take a look if
>> anything can be done?
>>
>> -- Forwarded message -
>> From: Neha . 
>> Date: Fri, Jul 14, 2023 at 4:06 PM
>> Subject: Checkpoint size smaller than Savepoint size
>> To: 
>>
>>
>> Hello,
>>
>> According to Flink's documentation, Checkpoints are designed to be
>> lightweight. However, in my Flink pipeline, I have observed that the
>> savepoint sizes are smaller than the checkpoints. Is this expected
>> behavior? What are the possible scenarios that can lead to this 
>> situation?
>>
>> Additionally, I have noticed that the checkpoint size in my
>> datastream pipeline continues to grow while the savepoint size behaves as
>> expected. Could this be attributed to the usage of Common Table 
>> Expressions
>> (CTEs) in Flink SQL?
>>
>> Flink version: 1.16.1
>> Incremental checkpointing is enabled.
>> StateBackend: RocksDB
>> Time Characteristic: Ingestion
>>
>> SQL:
>>
>> SELECT
>>   *
>> from
>>   (
>> With Actuals as (
>>   SELECT
>> clientOrderId,
>> Cast(
>>   ValueFromKeyCacheUDF(
>> concat('R_', CAST(order_df.restaurant_id AS VARCHAR))
>>   ) as INT
>> ) as zoneId,
>> cityId,
>> case
>>   when status = 'ASSIGNED' then 1
>>   else 0
>> end as acceptance_flag,
>> unicast.proctime
>>   FROM
>> order
>> INNER JOIN unicast_df ON unicast.clientOrderId =
>> order.order_id
>> AND order.proctime BETWEEN unicast.proctime - interval '70'
>> minute
>> AND 

Re: How to resume a job from checkpoint with the SQL gateway.

2023-07-18 Thread Shammon FY
Hi Xiaolong,

For new versions such as flink-1.17, flink sql-gateway supports job
management and user can stop/start jobs with savepoint. You can start a job
with a given savepoint path as [1] and stop a job with or without savepoint
as [2].

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#start-a-sql-job-from-a-savepoint
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#terminating-a-job

Best,
Shammon FY


On Tue, Jul 18, 2023 at 9:56 AM Xiaolong Wang 
wrote:

> Hi, Shammon,
>
> I know that the job manager can auto-recover via HA configurations, but
> what if I want to upgrade the running Flink SQL submitted by the Flink SQL
> gateway ?
>
> In normal cases, I can use the
>
>> ./flink run application -s ${SAVEPOINT_PATH} local://${FLINK_JOB_JAR}
>
> to resume a Flink job from a savepoint/checkpoint. The question is, how to
> do so with Flink sql gateway ?  What should I fill in the ${FLINK_JOB_JAR}
> field ?
>
> Thanks in advanced.
>
> On Mon, Jul 17, 2023 at 9:14 AM Shammon FY  wrote:
>
>> Hi Xiaolong,
>>
>> When a streaming job is submitted via Sql-Gateway, its lifecycle is no
>> longer related to Sql Gateway.
>>
>> Returning to the issue of job recovery, I think if your job cluster is
>> configured with HA, jobmanager will recover running streaming jobs from
>> their checkpoints after a failover occurs.
>>
>> Best,
>> Shammon FY
>>
>>
>> On Thu, Jul 13, 2023 at 10:22 AM Xiaolong Wang <
>> xiaolong.w...@smartnews.com> wrote:
>>
>>> Hi,
>>>
>>> I'm currently working on providing a SQL gateway to submit both
>>> streaming and batch queries.
>>>
>>> My question is, if a streaming SQL is submitted and then the jobmanager
>>> crashes, is it possible to resume the streaming SQL from the latest
>>> checkpoint with the SQL gateway ?
>>>
>>>
>>>
>>


Re: Elastic Block Store as checkpoint storage

2023-07-18 Thread Konstantin Knauf
Hi Prabhu,

this should be possible, but is quite expensive in comparison to AWS S3 and
you have to remount the EBS volumes to the new Taskmanagers in case of a
failure which takes some non-trivial time, which slows down recovery. So,
overall I don't think its peferrable compared to S3.

We do use EBS volumes, though, for the local RocksDB working directory. We
don't remount them on failure though right now due to the additional
latency that is introduced by that.

Cheers,

Konstantin

Am Mi., 12. Juli 2023 um 18:55 Uhr schrieb Prabhu Joseph <
prabhujose.ga...@gmail.com>:

> Hi,
>
> We are investigating the feasibility of setting up an Elastic Block Store
> (EBS) as checkpoint storage by mounting a volume (a shared local file
> system path) to JobManager and all the TaskManager pods. I want to hear any
> feedback on this approach if anyone has already tried it.
>
>
> Thanks,
> Prabhu Joseph
>


-- 
https://twitter.com/snntrable
https://github.com/knaufk


Flink Table API + Jacoco Plugin

2023-07-18 Thread Brendan Cortez
Hi all!

I'm trying to use the jacoco-maven-plugin and run unit-tests for Flink
Table API, but they fail with an error (see file error_log_flink_17.txt for
full error stacktrace in attachment):
java.lang.IllegalArgumentException: fromIndex(2) > toIndex(0)
...

I'm using:
- Flink:
  - flink-table-api-java-bridge 1.17.1
  - flink-test-utils 1.17.1
  - flink-table-test-utils 1.17.1
- jacoco-maven-plugin 0.8.10
- maven-surefire-plugin 3.0.0-M7

Flink Table API tests are running correctly if:
- jacoco-maven-plugin is not used
- goal prepare-agent of jacoco-maven-plugin is not used
- Flink 1.16.1 version is used

Best regards,
Brendan Cortez
<>


Re: Async IO For Cassandra

2023-07-18 Thread Giannis Polyzos
Hi Pritam.. since this is a look-up to an external system considering there
is network i/o in place and also the time to get the results it might be
normal to notice backpressure there.
Also note that the queries in Cassandra highly depend on the data model, so
data can be easy to find between the different nodes and also depends on
the amount of data that needs to be found in order to calculate the query.
For example, even a filtering query, might depend on the number of keys it
needs to find between the nodes, it needs to have proper partition keys
that can gather the results fast, grab the amount of data, perform the
operation, etc. It might be a good approach to also try and see how much
time it takes on the Cassandra side.

Hope this helps,
Best

On Tue, Jul 18, 2023 at 4:40 AM Shammon FY  wrote:

> Hi Pritam,
>
> I'm sorry that I'm not familiar with Cassandra. If your async function is
> always the root cause for backpressure, I think you can check the latency
> for the async request in your function and log some metrics.
>
> By the way, I think you can add cache in your async function to speedup
> the lookup request which we always do in loopup join for sql jobs.
>
>
> Best,
> Shammon FY
>
> On Mon, Jul 17, 2023 at 10:09 PM Pritam Agarwala <
> pritamagarwala...@gmail.com> wrote:
>
>> Hi Team,
>>
>>
>> Any input on this will be really helpful.
>>
>>
>> Thanks!
>>
>> On Tue, Jul 11, 2023 at 12:04 PM Pritam Agarwala <
>> pritamagarwala...@gmail.com> wrote:
>>
>>> Hi Team,
>>>
>>>
>>> I am using  "AsyncDataStream.unorderedWait" to connect to cassandra .
>>> The cassandra lookup operators are becoming the busy operator and creating
>>> back-pressure result low throughput.
>>>
>>>
>>> The Cassandra lookup is a very simple query. So I increased the capacity
>>> parameter to 80 from 15 and could see low busy % of cassandra operators.  I
>>> am monitoring the cassandra open connections and connected host metrics.
>>> Couldn't see any change on these metrics.
>>>
>>>
>>> How is the capacity parameter related to cassandra open connections and
>>> host ? If I increase capacity more will it have any impact on these metrics
>>> ?
>>>
>>> Thanks & Regards,
>>> Pritam
>>>
>>