Re: Question about time-based operators with RocksDB backend

2024-03-05 Thread Jinzhong Li
Hi Gabriele,

The keyed state APIs (ValueState、ListState、etc) are supported by all
types of state backend (hashmap、rocksdb、etc.). And the built-in window
operators are implemented with these state APIs internally. So you can use
these built-in operators/functions with the RocksDB state backend right out
of the box [1].

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#setting-default-state-backend

Best,
Jinzhong Li


On Tue, Mar 5, 2024 at 10:59 AM Zakelly Lan  wrote:

> Hi Gabriele,
>
> Quick answer: You can use the built-in window operators which have been
> integrated with state backends including RocksDB.
>
>
> Thanks,
> Zakelly
>
> On Tue, Mar 5, 2024 at 10:33 AM Zhanghao Chen 
> wrote:
>
>> Hi Gabriele,
>>
>> I'd recommend extending the existing window function whenever possible,
>> as Flink will automatically cover state management for you and no need to
>> be concerned with state backend details. Incremental aggregation for reduce
>> state size is also out of the box if your usage can be satisfied with the
>> reduce/aggregate function pattern, which is important for large windows.
>>
>> Best,
>> Zhanghao Chen
>> --
>> *From:* Gabriele Mencagli 
>> *Sent:* Monday, March 4, 2024 19:38
>> *To:* user@flink.apache.org 
>> *Subject:* Question about time-based operators with RocksDB backend
>>
>>
>> Dear Flink Community,
>>
>> I am using Flink with the DataStream API and operators implemented using
>> RichedFunctions. I know that Flink provides a set of window-based operators
>> with time-based semantics and tumbling/sliding windows.
>>
>> By reading the Flink documentation, I understand that there is the
>> possibility to change the memory backend utilized for storing the in-flight
>> state of the operators. For example, using RocksDB for this purpose to cope
>> with a larger-than-memory state. If I am not wrong, to transparently change
>> the backend (e.g., from in-memory to RocksDB) we have to use a proper API
>> to access the state. For example, the Keyed State API with different
>> abstractions such as ValueState, ListState, etc... as reported here
>> 
>> .
>>
>> My question is related to the utilization of time-based window operators
>> with the RocksDB backend. Suppose for example very large temporal windows
>> with a huge number of keys in the stream. I am wondering if there is a
>> possibility to use the built-in window operators of Flink (e.g., with an
>> AggregateFunction or a more generic ProcessWindowFunction as here
>> )
>> transparently with RocksDB support as a state back-end, or if I have to
>> develop the window operator in a raw manner using the Keyed State API
>> (e.g., ListState, AggregateState) for this purpose by implementing the
>> underlying window logic manually in the code of RichedFunction of the
>> operator (e.g., a FlatMap).
>> Thanks for your support,
>>
>> --
>> Gabriele Mencagli
>>
>>


Re: I used the yarn per job mode to submit tasks, which will end in 4 minutes

2024-03-05 Thread Junrui Lee
Hello,

The issue you're encountering is related to a new heartbeat mechanism
between the client and job in Flink-1.17. If the job does not receive any
heartbeats from the client within a specific timeout, it will cancel itself
to avoid hanging indefinitely.

To address this, you have two options:
1. Run your job in detached mode by adding the -d option in your command
line
2. Increase the client heartbeat timeout setting to a larger value, the
default value is 180 seconds

Best,
Junrui

程意  于2024年3月6日周三 09:53写道:

> In versions 1.17.1 and 1.18.1, I used the yarn per job mode to submit
> tasks, which will end in 4 minutes.  But I tried it on Flink 1.13.1,
> 1.15.2, and 1.16.3, all of which were normal.
> command line at 1.17.1 version:
> ```
> ./bin/flink run -t yarn-per-job -ys 1 -yjm 1G -ytm 3G -yqu default -p 1
> -sae -c org.apache.flink.streaming.examples.socket.SocketWindowWordCount
> ./examples/streaming/SocketWindowWordCount.jar -hostname 192.168.2.111
>  -port 
> ```
>
> The logs are printed as follows at 1.17.1 version:
> ```
> 2024-03-05 14:43:08,144 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
> TumblingProcessingTimeWindows -> Sink: Print to Std. Out (1/1)
> (a23ddaf520a680f213db5726192b7dc4_90bea66de1c231edf33913ecd54406c1_0_0)
> switched from INITIALIZING to RUNNING. 2024-03-05 14:43:29,232 ERROR
> org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] -
> Exception occurred in REST handler: Request did not match expected format
> JobClientHeartbeatRequestBody. 2024-03-05 14:43:59,222 ERROR
> org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] -
> Exception occurred in REST handler: Request did not match expected format
> JobClientHeartbeatRequestBody. 2024-03-05 14:44:29,226 ERROR
> org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] -
> Exception occurred in REST handler: Request did not match expected format
> JobClientHeartbeatRequestBody. 2024-03-05 14:44:59,218 ERROR
> org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] -
> Exception occurred in REST handler: Request did not match expected format
> JobClientHeartbeatRequestBody. 2024-03-05 14:45:29,216 ERROR
> org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] -
> Exception occurred in REST handler: Request did not match expected format
> JobClientHeartbeatRequestBody. 2024-03-05 14:45:59,217 ERROR
> org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] -
> Exception occurred in REST handler: Request did not match expected format
> JobClientHeartbeatRequestBody. 2024-03-05 14:46:29,217 ERROR
> org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] -
> Exception occurred in REST handler: Request did not match expected format
> JobClientHeartbeatRequestBody. 2024-03-05 14:46:58,363 WARN
>  org.apache.flink.runtime.dispatcher.MiniDispatcher   [] - The
> heartbeat from the job client is timeout and cancel the job
> cd6e02e2d60ea07a21e2809000e078cb. You can adjust the heartbeat interval by
> 'client.heartbeat.interval' and the timeout by 'client.heartbeat.timeout'
> ```
>
> I use hadoop version 3.1.1
>
>


I used the yarn per job mode to submit tasks, which will end in 4 minutes

2024-03-05 Thread 程意
In versions 1.17.1 and 1.18.1, I used the yarn per job mode to submit tasks, 
which will end in 4 minutes.  But I tried it on Flink 1.13.1, 1.15.2, and 
1.16.3, all of which were normal.
command line at 1.17.1 version:
```
./bin/flink run -t yarn-per-job -ys 1 -yjm 1G -ytm 3G -yqu default -p 1 -sae -c 
org.apache.flink.streaming.examples.socket.SocketWindowWordCount 
./examples/streaming/SocketWindowWordCount.jar -hostname 192.168.2.111  -port 

```

The logs are printed as follows at 1.17.1 version:
```
2024-03-05 14:43:08,144 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - 
TumblingProcessingTimeWindows -> Sink: Print to Std. Out (1/1) 
(a23ddaf520a680f213db5726192b7dc4_90bea66de1c231edf33913ecd54406c1_0_0) 
switched from INITIALIZING to RUNNING. 2024-03-05 14:43:29,232 ERROR 
org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - 
Exception occurred in REST handler: Request did not match expected format 
JobClientHeartbeatRequestBody. 2024-03-05 14:43:59,222 ERROR 
org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - 
Exception occurred in REST handler: Request did not match expected format 
JobClientHeartbeatRequestBody. 2024-03-05 14:44:29,226 ERROR 
org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - 
Exception occurred in REST handler: Request did not match expected format 
JobClientHeartbeatRequestBody. 2024-03-05 14:44:59,218 ERROR 
org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - 
Exception occurred in REST handler: Request did not match expected format 
JobClientHeartbeatRequestBody. 2024-03-05 14:45:29,216 ERROR 
org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - 
Exception occurred in REST handler: Request did not match expected format 
JobClientHeartbeatRequestBody. 2024-03-05 14:45:59,217 ERROR 
org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - 
Exception occurred in REST handler: Request did not match expected format 
JobClientHeartbeatRequestBody. 2024-03-05 14:46:29,217 ERROR 
org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - 
Exception occurred in REST handler: Request did not match expected format 
JobClientHeartbeatRequestBody. 2024-03-05 14:46:58,363 WARN  
org.apache.flink.runtime.dispatcher.MiniDispatcher   [] - The heartbeat 
from the job client is timeout and cancel the job 
cd6e02e2d60ea07a21e2809000e078cb. You can adjust the heartbeat interval by 
'client.heartbeat.interval' and the timeout by 'client.heartbeat.timeout'
```

I use hadoop version 3.1.1



Re: Handling late events with Table API / SQL

2024-03-05 Thread Feng Jin
You can use the  CURRENT_WATERMARK(rowtime)  function for some filtering,
please refer to [1] for details.


https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/

Best,
Feng

On Wed, Mar 6, 2024 at 1:56 AM Sunny S  wrote:

> Hi,
>
> I am using Flink SQL to create a table something like this :
>
> CREATE TABLE some-table (
> ...,
> ...,
> ...,
> ...,
> event_timestamp as TO_TIMESTAMP_LTZ(event_time*1000, 3),
> WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '30' SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'some-topic', +
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'properties.group.id' = 'testGroup',
> 'value.format' = 'csv'
> )
>
> I want to understand how can I deal with late events / out of order events
> when using Flink SQL / Table API? How can I collect the late / out of order
> events to a side output with Table API / SQL?
>
> Thanks
>


Re:Handling late events with Table API / SQL

2024-03-05 Thread Xuyang
Hi, for out of order events,  watermark can handle them. However, for late 
events, Flink Table & SQL are not 
supported to output them to a side channel like DataStream API. There have been 
some JIRAs related this.[1][2]
If you really need this feature, you may consider initiating related 
discussions in the dev mail again.
  
[1] https://issues.apache.org/jira/browse/FLINK-10031
[2] https://issues.apache.org/jira/browse/FLINK-20527




--

Best!
Xuyang




At 2024-03-06 01:55:03, "Sunny S"  wrote:

Hi,


I am using Flink SQL to create a table something like this :


CREATE TABLE some-table ( 
  ...,
  ...,
  ...,
  ...,
  event_timestamp as TO_TIMESTAMP_LTZ(event_time*1000, 3),
  WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '30' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'some-topic', +
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'value.format' = 'csv'
)


I want to understand how can I deal with late events / out of order events when 
using Flink SQL / Table API? How can I collect the late / out of order events 
to a side output with Table API / SQL?


Thanks 

Handling late events with Table API / SQL

2024-03-05 Thread Sunny S
Hi,

I am using Flink SQL to create a table something like this :

CREATE TABLE some-table (
  ...,
  ...,
  ...,
  ...,
  event_timestamp as TO_TIMESTAMP_LTZ(event_time*1000, 3),
  WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '30' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'some-topic', +
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'value.format' = 'csv'
)

I want to understand how can I deal with late events / out of order events when 
using Flink SQL / Table API? How can I collect the late / out of order events 
to a side output with Table API / SQL?

Thanks


Re: Temporal join on rolling aggregate

2024-03-05 Thread Gyula Fóra
Hi Everyone!

I have discussed this with Sébastien Chevalley, he is going to prepare and
drive the FLIP while I will assist him along the way.

Thanks
Gyula

On Tue, Mar 5, 2024 at 9:57 AM  wrote:

> I do agree with Ron Liu.
> This would definitely need a FLIP as it would impact SQL and extend it
> with the equivalent of TimestampAssigners in the Java API.
>
> Is there any existing JIRA here, or is anybody willing to drive a FLIP?
> On Feb 26, 2024 at 02:36 +0100, Ron liu , wrote:
>
> +1,
> But I think this should be a more general requirement, that is, support for
> declaring watermarks in query, which can be declared for any type of
> source, such as table, view. Similar to databricks provided [1], this needs
> a FLIP.
>
> [1]
>
> https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-qry-select-watermark.html
>
> Best,
> Ron
>
>


Re: Temporal join on rolling aggregate

2024-03-05 Thread lorenzo.affetti.ververica.com via user
I do agree with Ron Liu.
This would definitely need a FLIP as it would impact SQL and extend it with the 
equivalent of TimestampAssigners in the Java API.

Is there any existing JIRA here, or is anybody willing to drive a FLIP?
On Feb 26, 2024 at 02:36 +0100, Ron liu , wrote:
> +1,
> But I think this should be a more general requirement, that is, support for
> declaring watermarks in query, which can be declared for any type of
> source, such as table, view. Similar to databricks provided [1], this needs
> a FLIP.
>
> [1]
> https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-qry-select-watermark.html
>
> Best,
> Ron