Re: Handling late events with Table API / SQL

2024-03-07 Thread Sunny S
Thanks for the response! Sad that that side output for late data is not 
supported in Table API and SQL. I will start the discussions regarding this.

In the meanwhile, I am trying to use the built-in function 
CURRENT_WATERMARK(rowtime) to be able to collect late data. The scenario I have 
is : I am creating a table with Kafka connector and defining the watermark in 
that table. Reference to this table definition can be found in the mail above. 
Next, I apply a tumbling window SQL query on this table. I want to collect the 
late data for this window operation. I am not clear how would CURRENT_WATERMARK 
function help me in getting the late data for the window operator.

Also, I am a bit confused regarding the way we determine if an event is late 
for a window operator. From the WindowOperator code :

protected boolean isElementLate(StreamRecord element) {
return (windowAssigner.isEventTime())
&& (element.getTimestamp() + allowedLateness
<= internalTimerService.currentWatermark());
}

it seems the operator maintains a currentWatermark. I am trying to understand 
how does this currentWatermark change during the course of the operator 
receiving the first event that belongs to this window until the time this 
window fires.

Please help understanding these.

Thanks










From: Feng Jin 
Sent: 06 March 2024 07:08
To: Sunny S 
Cc: user@flink.apache.org 
Subject: Re: Handling late events with Table API / SQL


You can use the  CURRENT_WATERMARK(rowtime)  function for some filtering, 
please refer to [1] for details.


https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/

Best,
Feng

On Wed, Mar 6, 2024 at 1:56 AM Sunny S 
mailto:sunny8...@outlook.in>> wrote:
Hi,

I am using Flink SQL to create a table something like this :

CREATE TABLE some-table (
  ...,
  ...,
  ...,
  ...,
  event_timestamp as TO_TIMESTAMP_LTZ(event_time*1000, 3),
  WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '30' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'some-topic', +
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'value.format' = 'csv'
)

I want to understand how can I deal with late events / out of order events when 
using Flink SQL / Table API? How can I collect the late / out of order events 
to a side output with Table API / SQL?

Thanks


Using User-defined Table Aggregates Functions in SQL

2024-03-07 Thread Jad Naous
Hi,
The docs don't mention the correct syntax for using UDTAGGs in SQL. Is it
possible to use them with SQL?
Thanks,
Jad Naous

Grepr, CEO/Founder

ᐧ


Re: TTL in pyflink does not seem to work

2024-03-07 Thread Ivan Petrarka
Note, that in Java code, it prints `State: Null`, `State: Null`, as I was 
expecting in, unlike pyflink code
On Mar 7, 2024 at 15:59 +0400, Ivan Petrarka , wrote:
> Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to 
> work. I have reproduced the exact same code in Java and it works!
>
> Is this a pyflink bug? If so - how can I report it? If not - what can I try 
> to do?
>
> Flink: 1.18.0
> image: flink:1.18.0-scala_2.12-java11
>
> Code to reproduce. I expect this code to print:  all 
> the time. But it prints  and state value
>
> ```python
> import time
>
> from datetime import datetime
>
> from pyflink.common import Time, Types
> from pyflink.datastream import KeyedProcessFunction, RuntimeContext, 
> StreamExecutionEnvironment, TimeCharacteristic
> from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor
>
>
> class Processor(KeyedProcessFunction):
>     def open(self, runtime_context: RuntimeContext):
>         state_descriptor = ValueStateDescriptor(
>             name="my_state",
>             value_type_info=Types.STRING(),
>         )
>
>         state_descriptor.enable_time_to_live(
>             ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
>             .cleanup_incrementally(cleanup_size=10, 
> run_cleanup_for_every_record=True)
>             .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
>             
> .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>             .build()
>         )
>
>         self.state = runtime_context.get_state(state_descriptor)
>
>     def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
>         current_state = self.state.value()
>
>         print(datetime.now(), current_state)
>
>         if current_state is None:
>             self.state.update(str(datetime.now()))
>
>         time.sleep(1.5)
>
>
> if __name__ == "__main__":
>     # - Init environment
>
>     environment = 
> StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
>
>     # - Setup pipeline
>
>     (
>         environment.set_parallelism(1)
>         .from_collection(
>             collection=list(range(10)),
>         )
>         .key_by(lambda value: 0)
>         .process(Processor())
>
>
>
>     )
>
>     # - Execute pipeline
>
>     environment.execute("ttl_test")
>
>
>
> ```
>
> ```java
> import org.apache.flink.api.common.state.StateTtlConfig;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.time.Time;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.metrics.Histogram;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.util.Collector;
>
> import java.io.IOException;
> import java.time.LocalDateTime;
>
> public class GameHistoryProcessor extends KeyedProcessFunction String, String> {
>
>
>     private transient ValueState state;
>
>
>     @Override
>     public void open(Configuration parameters) {
>         var stateTtlConfig = StateTtlConfig
>                 .newBuilder(Time.seconds(1))
> //                .cleanupFullSnapshot()
>                 .cleanupIncrementally(10, true)
>                 .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>                 
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>                 .build();
>
>         var stateDescriptor = new ValueStateDescriptor<>("state", 
> String.class);
>         stateDescriptor.enableTimeToLive(stateTtlConfig);
>
>         state = getRuntimeContext().getState(stateDescriptor);
>
>     }
>
>     @Override
>     public void processElement(String event, Context context, 
> Collector collector) throws IOException, InterruptedException {
>         var state = state.value();
>         System.out.println("State: " + state);
>
>         if (state == null) {
>             state = LocalDateTime.now().toString();
>             state.update(state);
>         }
>
>         Thread.sleep(1500);
>     }
> }```


TTL in pyflink does not seem to work

2024-03-07 Thread Ivan Petrarka
Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to 
work. I have reproduced the exact same code in Java and it works!

Is this a pyflink bug? If so - how can I report it? If not - what can I try to 
do?

Flink: 1.18.0
image: flink:1.18.0-scala_2.12-java11

Code to reproduce. I expect this code to print:  all 
the time. But it prints  and state value

```python
import time

from datetime import datetime

from pyflink.common import Time, Types
from pyflink.datastream import KeyedProcessFunction, RuntimeContext, 
StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor


class Processor(KeyedProcessFunction):
    def open(self, runtime_context: RuntimeContext):
        state_descriptor = ValueStateDescriptor(
            name="my_state",
            value_type_info=Types.STRING(),
        )

        state_descriptor.enable_time_to_live(
            ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
            .cleanup_incrementally(cleanup_size=10, 
run_cleanup_for_every_record=True)
            .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
            
.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .build()
        )

        self.state = runtime_context.get_state(state_descriptor)

    def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
        current_state = self.state.value()

        print(datetime.now(), current_state)

        if current_state is None:
            self.state.update(str(datetime.now()))

        time.sleep(1.5)


if __name__ == "__main__":
    # - Init environment

    environment = 
StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)

    # - Setup pipeline

    (
        environment.set_parallelism(1)
        .from_collection(
            collection=list(range(10)),
        )
        .key_by(lambda value: 0)
        .process(Processor())



    )

    # - Execute pipeline

    environment.execute("ttl_test")



```

```java
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.time.LocalDateTime;

public class GameHistoryProcessor extends KeyedProcessFunction {


    private transient ValueState state;


    @Override
    public void open(Configuration parameters) {
        var stateTtlConfig = StateTtlConfig
                .newBuilder(Time.seconds(1))
//                .cleanupFullSnapshot()
                .cleanupIncrementally(10, true)
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        var stateDescriptor = new ValueStateDescriptor<>("state", String.class);
        stateDescriptor.enableTimeToLive(stateTtlConfig);

        state = getRuntimeContext().getState(stateDescriptor);

    }

    @Override
    public void processElement(String event, Context context, Collector 
collector) throws IOException, InterruptedException {
        var state = state.value();
        System.out.println("State: " + state);

        if (state == null) {
            state = LocalDateTime.now().toString();
            state.update(state);
        }

        Thread.sleep(1500);
    }
}```


使用avro schema注册confluent schema registry失败

2024-03-07 Thread casel.chen
我使用注册kafka topic对应的schema到confluent schema registry时报错,想知道问题的原因是什么?如何fix?




io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
Schema being registered is incompatible with an earlier schema for subject 
"rtdp_test-test_schema-value", details: 
[{errorType:'READER_FIELD_MISSING_DEFAULT_VALUE', description:'The field 
'salary' at path '/fields/10/type/fields/5' in the new schema has no default 
value and is missing in the old schema', additionalInfo:'salary'}, 
{errorType:'READER_FIELD_MISSING_DEFAULT_VALUE', description:'The field 
'salary' at path '/fields/11/type/fields/5' in the new schema has no default 
value and is missing in the old schema', additionalInfo:'salary'}, 
{oldSchemaVersion: 4}, {oldSchema: 
'{"type":"record","name":"Envelope","namespace":"rtdp_test-test_schema","fields":[{"name":"database","type":"string"},{"name":"es","type":"long"},{"name":"id","type":"int"},{"name":"isDdl","type":"boolean"},{"name":"sql","type":"string"},{"name":"table","type":"string"},{"name":"ts","type":"long"},{"name":"type","type":"string"},{"name":"pkNames","type":{"type":"array","items":"string"}},{"name":"data","type":[{"type":"array","items":{"type":"record","name":"Value","fields":[{"name":"id","type":["long","null"],"default":0},{"name":"create_time","type":{"type":"long","logicalType":"timestamp-millis"},"default":0},{"name":"update_time","type":{"type":"long","logicalType":"timestamp-millis"},"default":0},{"name":"name","type":["string","null"],"default":""},{"name":"gender","type":["string","null"],"default":""}]}},"null"]},{"name":"mysqlType","type":{"type":"record","name":"mysqlType","fields":[{"name":"id","type":"string"},{"name":"create_time","type":"string"},{"name":"update_time","type":"string"},{"name":"name","type":"string"},{"name":"gender","type":"string"}]}},{"name":"sqlType","type":{"type":"record","name":"sqlType","fields":[{"name":"id","type":"int"},{"name":"create_time","type":"int"},{"name":"update_time","type":"int"},{"name":"name","type":"int"},{"name":"gender","type":"int"}]}},{"name":"old","type":[{"type":"array","items":"Value"},"null"]}]}'},
 {validateFields: 'false', compatibility: 'BACKWARD'}]; error code: 409

at 
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:314)

at 
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:384)




maven依赖:



io.confluent

kafka-schema-registry-client

7.3.1






java代码:

Schema decimalSchema = LogicalTypes.decimal(precision, 
scale).addToSchema(SchemaBuilder.builder().bytesType());

data = 
data.name(columnName).type().unionOf().nullType().and().type(decimalSchema).endUnion().nullDefault();




salary字段是decimal类型,报错是说之前有一个不带salary字段版本的schema,而新版本schema里该salary字段定义中缺少default
 value,可我明明设置了nullDefault呀,这一点从生成的avro schema json string也可验证:




{

"type": "record",

"name": "Envelope",

"namespace": "rtdp_test-test_schema",

"fields": [

{

"name": "database",

"type": "string"

},

{

"name": "es",

"type": "long"

},

{

"name": "id",

"type": "int"

},

{

"name": "isDdl",

"type": "boolean"

},

{

"name": "sql",

"type": "string"

},

{

"name": "table",

"type": "string"

},

{

"name": "ts",

"type": "long"

},

{

"name": "type",

"type": "string"

},

{

"name": "pkNames",

"type": {

"type": "array",

"items": "string"

}

},

{

"name": "data",

"type": [

{

"type": "array",

"items": {

"type": "record",

"name": "Value",

"fields": [

{

"name": "id",

"type": [

"null",

"long"

],

"default": null

},

{

"name": "create_time",

"type": [

"null",

{

"type": "long",

"logicalType": "timestamp-millis"

}

],

"default": null

},

{


Re:Re: Running Flink SQL in production

2024-03-07 Thread Xuyang
Hi. 
Hmm, if I'm mistaken, please correct me. Using a SQL client might not be very 
convenient for those who need to verify the 
results of submissions, such as checking for exceptions related to submission 
failures, and so on.




--

Best!
Xuyang




在 2024-03-07 17:32:07,"Robin Moffatt"  写道:

Thanks for the reply. 
In terms of production, my thinking is you'll have your SQL in a file under 
code control. Whether that SQL ends up getting submitted via an invocation of 
SQL Client with -f or via REST API seems moot. WDYT? 







On Thu, 7 Mar 2024 at 01:53, Xuyang  wrote:

Hi, IMO, both the SQL Client and the Restful API can provide connections to the 
SQL Gateway service for submitting jobs. A slight difference is that the SQL 
Client also offers a command-line visual interface for users to view results. 
In your production scenes, placing the SQL to be submitted into a file and then 
using the '-f' command in SQL Client to submit the file sounds a bit 
roundabout. You can just use the Restful API to submit them directly?




--

Best!
Xuyang




At 2024-03-07 04:11:01, "Robin Moffatt via user"  wrote:

I'm reading the deployment guide[1] and wanted to check my understanding. For 
deploying a SQL job into production, would the pattern be to write the SQL in a 
file that's under source control, and pass that file as an argument to SQL 
Client with -f argument (as in this docs example[2])?
Or script a call to the SQL Gateway's REST API? 


Are there pros and cons to each approach?


thanks, Robin


[1]: 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/overview/
[2]: 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sqlclient/#execute-sql-files

RE: SecurityManager in Flink

2024-03-06 Thread Kirti Dhar Upadhyay K via user
Hi Gabor,

The issue is that, read permission is not getting checked when Flink FileSource 
is listing the files under given source directory.
This is happening as Security Manager is coming as null.

public String[] list() {
SecurityManager security = System.getSecurityManager(); -> Here Security 
Manager is coming as Null.
if (security != null) {
security.checkRead(path);
}
if (isInvalid()) {
return null;
}
return fs.list(this);
}

While debugging it, found a method in Flink Security manager  like below, hence 
I suspected towards it and queried to know the role of Flink Security manager.


public static void setFromConfiguration(Configuration configuration) {
final FlinkSecurityManager flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
if (flinkSecurityManager != null) {
try {
System.setSecurityManager(flinkSecurityManager);
} catch (Exception e) {
…
…

Regards,
Kirti Dhar

From: Gabor Somogyi 
Sent: Wednesday, March 6, 2024 7:17 PM
To: Kirti Dhar Upadhyay K 
Cc: User@flink.apache.org
Subject: Re: SecurityManager in Flink

Hi Kirti,

Not sure what is the exact issue here but I'm not convinced that having 
FlinkSecurityManager is going to solve it.
Here is the condition however:
* cluster.intercept-user-system-exit != DISABLED (this must be changed)
* cluster.processes.halt-on-fatal-error == false (this is good by default)

Here is a gist what Flink's SecurityManager does:
/**
 * {@code FlinkSecurityManager} to control certain behaviors that can be 
captured by Java system
 * security manager. It can be used to control unexpected user behaviors that 
potentially impact
 * cluster availability, for example, it can warn or prevent user code from 
terminating JVM by
 * System.exit or halt by logging or throwing an exception. This does not 
necessarily prevent
 * malicious users who try to tweak security manager on their own, but more for 
being dependable
 * against user mistakes by gracefully handling them informing users rather 
than causing silent
 * unavailability.
 */

G


On Wed, Mar 6, 2024 at 11:10 AM Kirti Dhar Upadhyay K via user 
mailto:user@flink.apache.org>> wrote:
Hi Team,

I am using Flink File Source with Local File System.
I am facing an issue, if source directory does not has read permission, it is 
returning the list of files as null instead of throwing permission exception 
(refer the highlighted line below), resulting in NPE.

final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());
for (FileStatus containedStatus : containedFiles) {
addSplitsForPath(containedStatus, fs, target);
}
Debugging the issue found that, SecurityManager is coming as null while listing 
the files, hence skipping the permissions on directory.
What is the way to set SecurityManager in Flink?

Regards,
Kirti Dhar



RE: SecurityManager in Flink

2024-03-06 Thread Kirti Dhar Upadhyay K via user
Hi Yanfei,

I am facing this issue on jdk1.8/11.
 Thanks for giving pointer, I will try to set Security manager and check the 
behaviour.

Regards,
Kirti Dhar

-Original Message-
From: Yanfei Lei  
Sent: Wednesday, March 6, 2024 4:37 PM
To: Kirti Dhar Upadhyay K 
Cc: User@flink.apache.org
Subject: Re: SecurityManager in Flink

Hi Kirti Dhar,
What is your java version? I guess this problem may be related to 
FLINK-33309[1]. Maybe you can try adding "-Djava.security.manager" to the java 
options.

[1] https://issues.apache.org/jira/browse/FLINK-33309

Kirti Dhar Upadhyay K via user  于2024年3月6日周三 18:10写道:
>
> Hi Team,
>
>
>
> I am using Flink File Source with Local File System.
>
> I am facing an issue, if source directory does not has read permission, it is 
> returning the list of files as null instead of throwing permission exception 
> (refer the highlighted line below), resulting in NPE.
>
>
>
> final FileStatus[] containedFiles = 
> fs.listStatus(fileStatus.getPath());
> for (FileStatus containedStatus : containedFiles) {
> addSplitsForPath(containedStatus, fs, target); }
>
> Debugging the issue found that, SecurityManager is coming as null while 
> listing the files, hence skipping the permissions on directory.
>
> What is the way to set SecurityManager in Flink?
>
>
>
> Regards,
>
> Kirti Dhar
>
>



--
Best,
Yanfei


RE: SecurityManager in Flink

2024-03-06 Thread Kirti Dhar Upadhyay K via user
Hi Hang,

You got it right. The problem is exactly at the same line where you pointed [1].
I have used below solution as of now.

```
If(!Files.isReadable(Paths.get(fileStatus.getPath().getPath( {
throw new FlinkRuntimeException("Cannot list files under " + 
fileStatus.getPath());
}

final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());

for (FileStatus containedStatus : containedFiles) {
addSplitsForPath(containedStatus, fs, target);
}
```

Although, if you go inside localf.list(), it checks automatically for the read 
permission using Security Manager. This check is getting skipped as Security 
Manager is coming as null.
Hence I suspected towards Security Manager.

[1] 
https://github.com/apache/flink/blob/9b1375520b6b351df7551d85fcecd920e553cc3a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java#L161C32-L161C38


Regards,
Kirti Dhar

From: Hang Ruan 
Sent: Wednesday, March 6, 2024 6:46 PM
To: Kirti Dhar Upadhyay K 
Cc: User@flink.apache.org
Subject: Re: SecurityManager in Flink

Hi, Kirti.

Could you please provide the stack trace of this NPE? I check the code and I 
think maybe the problem lies in LocalFileSystem#listStatus.
The code in line 161[1] may return null, which will let 
LocalFileSystem#listStatus return null. Then the `containedFiles` is null and 
the NPE occurs.
I think we should add code to handle this situation as follows.

```
final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());
if (containedFiles == null) {
throw new FlinkRuntimeException("Cannot list files under " + 
fileStatus.getPath());
}
for (FileStatus containedStatus : containedFiles) {
addSplitsForPath(containedStatus, fs, target);
}
```

Best,
Hang

[1] 
https://github.com/apache/flink/blob/9b1375520b6b351df7551d85fcecd920e553cc3a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java#L161C32-L161C38

Kirti Dhar Upadhyay K via user 
mailto:user@flink.apache.org>> 于2024年3月6日周三 18:10写道:
Hi Team,

I am using Flink File Source with Local File System.
I am facing an issue, if source directory does not has read permission, it is 
returning the list of files as null instead of throwing permission exception 
(refer the highlighted line below), resulting in NPE.

final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());
for (FileStatus containedStatus : containedFiles) {
addSplitsForPath(containedStatus, fs, target);
}
Debugging the issue found that, SecurityManager is coming as null while listing 
the files, hence skipping the permissions on directory.
What is the way to set SecurityManager in Flink?

Regards,
Kirti Dhar



Re:Reading Iceberg Tables in Batch mode performance

2024-03-06 Thread Xuyang
Hi, can you provide more details about this Flink batch job? For instance, 
through a flame graph, the threads are found spending most of their time on 
some certain tasks.




--

Best!
Xuyang




At 2024-03-07 08:40:32, "Charles Tan"  wrote:

Hi all,


I have been looking into using Flink in batch mode to process Iceberg tables. I 
noticed that the performance for queries in Flink's batch mode is quite slow, 
especially when compared to Spark. I'm wondering if there are any 
configurations that I'm missing to get better performance out of reading from 
Iceberg.


In the Flink SQL shell, I ran the following:
1. SET execution.runtime-mode = batch;

2. SELECT COUNT(*) FROM t;


The table t has about 2mb of data and this query took 24 seconds for Flink to 
run. This is compared to the 2.4 seconds it took for Spark to execute the same 
query.


Flink version 1.17.1
Spark version 3.5.1


Any insights or suggestions would be appreciated.


Thanks,
Charles

Re:Running Flink SQL in production

2024-03-06 Thread Xuyang
Hi, IMO, both the SQL Client and the Restful API can provide connections to the 
SQL Gateway service for submitting jobs. A slight difference is that the SQL 
Client also offers a command-line visual interface for users to view results. 
In your production scenes, placing the SQL to be submitted into a file and then 
using the '-f' command in SQL Client to submit the file sounds a bit 
roundabout. You can just use the Restful API to submit them directly?




--

Best!
Xuyang




At 2024-03-07 04:11:01, "Robin Moffatt via user"  wrote:

I'm reading the deployment guide[1] and wanted to check my understanding. For 
deploying a SQL job into production, would the pattern be to write the SQL in a 
file that's under source control, and pass that file as an argument to SQL 
Client with -f argument (as in this docs example[2])?
Or script a call to the SQL Gateway's REST API? 


Are there pros and cons to each approach?


thanks, Robin


[1]: 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/overview/
[2]: 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sqlclient/#execute-sql-files

Reading Iceberg Tables in Batch mode performance

2024-03-06 Thread Charles Tan
Hi all,

I have been looking into using Flink in batch mode to process Iceberg
tables. I noticed that the performance for queries in Flink's batch mode is
quite slow, especially when compared to Spark. I'm wondering if there are
any configurations that I'm missing to get better performance out of
reading from Iceberg.

In the Flink SQL shell, I ran the following:
1. SET execution.runtime-mode = batch;
2. SELECT COUNT(*) FROM t;

The table t has about 2mb of data and this query took 24 seconds for Flink
to run. This is compared to the 2.4 seconds it took for Spark to execute
the same query.

Flink version 1.17.1
Spark version 3.5.1

Any insights or suggestions would be appreciated.

Thanks,
Charles


Running Flink SQL in production

2024-03-06 Thread Robin Moffatt via user
I'm reading the deployment guide[1] and wanted to check my understanding.
For deploying a SQL job into production, would the pattern be to write the
SQL in a file that's under source control, and pass that file as an
argument to SQL Client with -f argument (as in this docs example[2])?
Or script a call to the SQL Gateway's REST API?

Are there pros and cons to each approach?

thanks, Robin

[1]:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/overview/
[2]:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sqlclient/#execute-sql-files


Re: SecurityManager in Flink

2024-03-06 Thread Gabor Somogyi
Hi Kirti,

Not sure what is the exact issue here but I'm not convinced that
having FlinkSecurityManager is going to solve it.
Here is the condition however:
* cluster.intercept-user-system-exit != DISABLED (this must be changed)
* cluster.processes.halt-on-fatal-error == false (this is good by default)

Here is a gist what Flink's SecurityManager does:
/**
 * {@code FlinkSecurityManager} to control certain behaviors that can be
captured by Java system
 * security manager. It can be used to control unexpected user behaviors
that potentially impact
 * cluster availability, for example, it can warn or prevent user code from
terminating JVM by
 * System.exit or halt by logging or throwing an exception. This does not
necessarily prevent
 * malicious users who try to tweak security manager on their own, but more
for being dependable
 * against user mistakes by gracefully handling them informing users rather
than causing silent
 * unavailability.
 */

G


On Wed, Mar 6, 2024 at 11:10 AM Kirti Dhar Upadhyay K via user <
user@flink.apache.org> wrote:

> Hi Team,
>
>
>
> I am using Flink File Source with Local File System.
>
> I am facing an issue, if source directory does not has read permission, it
> is returning the list of files as null instead of throwing permission
> exception (refer the highlighted line below), resulting in NPE.
>
>
>
> final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());
> for (FileStatus containedStatus : containedFiles) {
> addSplitsForPath(containedStatus, fs, target);
> }
>
> Debugging the issue found that, SecurityManager is coming as null while
> listing the files, hence skipping the permissions on directory.
>
> What is the way to set SecurityManager in Flink?
>
>
>
> Regards,
>
> Kirti Dhar
>
>
>


Re: SecurityManager in Flink

2024-03-06 Thread Hang Ruan
Hi, Kirti.

Could you please provide the stack trace of this NPE? I check the code and
I think maybe the problem lies in LocalFileSystem#listStatus.
The code in line 161[1] may return null, which will let
LocalFileSystem#listStatus return null. Then the `containedFiles` is null
and the NPE occurs.
I think we should add code to handle this situation as follows.

```
final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());
if (containedFiles == null) {
throw new FlinkRuntimeException("Cannot list files under " +
fileStatus.getPath());
}
for (FileStatus containedStatus : containedFiles) {
addSplitsForPath(containedStatus, fs, target);
}
```

Best,
Hang

[1]
https://github.com/apache/flink/blob/9b1375520b6b351df7551d85fcecd920e553cc3a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java#L161C32-L161C38

Kirti Dhar Upadhyay K via user  于2024年3月6日周三 18:10写道:

> Hi Team,
>
>
>
> I am using Flink File Source with Local File System.
>
> I am facing an issue, if source directory does not has read permission, it
> is returning the list of files as null instead of throwing permission
> exception (refer the highlighted line below), resulting in NPE.
>
>
>
> final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());
> for (FileStatus containedStatus : containedFiles) {
> addSplitsForPath(containedStatus, fs, target);
> }
>
> Debugging the issue found that, SecurityManager is coming as null while
> listing the files, hence skipping the permissions on directory.
>
> What is the way to set SecurityManager in Flink?
>
>
>
> Regards,
>
> Kirti Dhar
>
>
>


Re: SecurityManager in Flink

2024-03-06 Thread Yanfei Lei
Hi Kirti Dhar,
What is your java version? I guess this problem may be related to
FLINK-33309[1]. Maybe you can try adding "-Djava.security.manager" to
the java options.

[1] https://issues.apache.org/jira/browse/FLINK-33309

Kirti Dhar Upadhyay K via user  于2024年3月6日周三 18:10写道:
>
> Hi Team,
>
>
>
> I am using Flink File Source with Local File System.
>
> I am facing an issue, if source directory does not has read permission, it is 
> returning the list of files as null instead of throwing permission exception 
> (refer the highlighted line below), resulting in NPE.
>
>
>
> final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());
> for (FileStatus containedStatus : containedFiles) {
> addSplitsForPath(containedStatus, fs, target);
> }
>
> Debugging the issue found that, SecurityManager is coming as null while 
> listing the files, hence skipping the permissions on directory.
>
> What is the way to set SecurityManager in Flink?
>
>
>
> Regards,
>
> Kirti Dhar
>
>



-- 
Best,
Yanfei


Re: Question about time-based operators with RocksDB backend

2024-03-06 Thread xia rui
Hi Gabriele, use (or extend) the window operator provided by Flink is a
better idea. A window operator in Flink manages two types of state:

   - Window state: accumlate data for windows, and provide data to window
   function when a window comes to its end time.
   - Timer state: store the end times of windows, and provide the minimum
   end time to the window operator.


The source codes are mainly
in org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.

   - "InternalAppendingState windowState" refers to the
   window state.
   - "InternalTimerService internalTimerService" refers to the timer
   state.

Best regards
Rui Xia

On Mon, Mar 4, 2024 at 7:39 PM Gabriele Mencagli <
gabriele.menca...@gmail.com> wrote:

> Dear Flink Community,
>
> I am using Flink with the DataStream API and operators implemented using
> RichedFunctions. I know that Flink provides a set of window-based operators
> with time-based semantics and tumbling/sliding windows.
>
> By reading the Flink documentation, I understand that there is the
> possibility to change the memory backend utilized for storing the in-flight
> state of the operators. For example, using RocksDB for this purpose to cope
> with a larger-than-memory state. If I am not wrong, to transparently change
> the backend (e.g., from in-memory to RocksDB) we have to use a proper API
> to access the state. For example, the Keyed State API with different
> abstractions such as ValueState, ListState, etc... as reported here
> 
> .
>
> My question is related to the utilization of time-based window operators
> with the RocksDB backend. Suppose for example very large temporal windows
> with a huge number of keys in the stream. I am wondering if there is a
> possibility to use the built-in window operators of Flink (e.g., with an
> AggregateFunction or a more generic ProcessWindowFunction as here
> )
> transparently with RocksDB support as a state back-end, or if I have to
> develop the window operator in a raw manner using the Keyed State API
> (e.g., ListState, AggregateState) for this purpose by implementing the
> underlying window logic manually in the code of RichedFunction of the
> operator (e.g., a FlatMap).
> Thanks for your support,
>
> --
> Gabriele Mencagli
>
>


SecurityManager in Flink

2024-03-06 Thread Kirti Dhar Upadhyay K via user
Hi Team,

I am using Flink File Source with Local File System.
I am facing an issue, if source directory does not has read permission, it is 
returning the list of files as null instead of throwing permission exception 
(refer the highlighted line below), resulting in NPE.

final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());
for (FileStatus containedStatus : containedFiles) {
addSplitsForPath(containedStatus, fs, target);
}
Debugging the issue found that, SecurityManager is coming as null while listing 
the files, hence skipping the permissions on directory.
What is the way to set SecurityManager in Flink?

Regards,
Kirti Dhar



Re: Question about time-based operators with RocksDB backend

2024-03-05 Thread Jinzhong Li
Hi Gabriele,

The keyed state APIs (ValueState、ListState、etc) are supported by all
types of state backend (hashmap、rocksdb、etc.). And the built-in window
operators are implemented with these state APIs internally. So you can use
these built-in operators/functions with the RocksDB state backend right out
of the box [1].

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#setting-default-state-backend

Best,
Jinzhong Li


On Tue, Mar 5, 2024 at 10:59 AM Zakelly Lan  wrote:

> Hi Gabriele,
>
> Quick answer: You can use the built-in window operators which have been
> integrated with state backends including RocksDB.
>
>
> Thanks,
> Zakelly
>
> On Tue, Mar 5, 2024 at 10:33 AM Zhanghao Chen 
> wrote:
>
>> Hi Gabriele,
>>
>> I'd recommend extending the existing window function whenever possible,
>> as Flink will automatically cover state management for you and no need to
>> be concerned with state backend details. Incremental aggregation for reduce
>> state size is also out of the box if your usage can be satisfied with the
>> reduce/aggregate function pattern, which is important for large windows.
>>
>> Best,
>> Zhanghao Chen
>> --
>> *From:* Gabriele Mencagli 
>> *Sent:* Monday, March 4, 2024 19:38
>> *To:* user@flink.apache.org 
>> *Subject:* Question about time-based operators with RocksDB backend
>>
>>
>> Dear Flink Community,
>>
>> I am using Flink with the DataStream API and operators implemented using
>> RichedFunctions. I know that Flink provides a set of window-based operators
>> with time-based semantics and tumbling/sliding windows.
>>
>> By reading the Flink documentation, I understand that there is the
>> possibility to change the memory backend utilized for storing the in-flight
>> state of the operators. For example, using RocksDB for this purpose to cope
>> with a larger-than-memory state. If I am not wrong, to transparently change
>> the backend (e.g., from in-memory to RocksDB) we have to use a proper API
>> to access the state. For example, the Keyed State API with different
>> abstractions such as ValueState, ListState, etc... as reported here
>> 
>> .
>>
>> My question is related to the utilization of time-based window operators
>> with the RocksDB backend. Suppose for example very large temporal windows
>> with a huge number of keys in the stream. I am wondering if there is a
>> possibility to use the built-in window operators of Flink (e.g., with an
>> AggregateFunction or a more generic ProcessWindowFunction as here
>> )
>> transparently with RocksDB support as a state back-end, or if I have to
>> develop the window operator in a raw manner using the Keyed State API
>> (e.g., ListState, AggregateState) for this purpose by implementing the
>> underlying window logic manually in the code of RichedFunction of the
>> operator (e.g., a FlatMap).
>> Thanks for your support,
>>
>> --
>> Gabriele Mencagli
>>
>>


Re: I used the yarn per job mode to submit tasks, which will end in 4 minutes

2024-03-05 Thread Junrui Lee
Hello,

The issue you're encountering is related to a new heartbeat mechanism
between the client and job in Flink-1.17. If the job does not receive any
heartbeats from the client within a specific timeout, it will cancel itself
to avoid hanging indefinitely.

To address this, you have two options:
1. Run your job in detached mode by adding the -d option in your command
line
2. Increase the client heartbeat timeout setting to a larger value, the
default value is 180 seconds

Best,
Junrui

程意  于2024年3月6日周三 09:53写道:

> In versions 1.17.1 and 1.18.1, I used the yarn per job mode to submit
> tasks, which will end in 4 minutes.  But I tried it on Flink 1.13.1,
> 1.15.2, and 1.16.3, all of which were normal.
> command line at 1.17.1 version:
> ```
> ./bin/flink run -t yarn-per-job -ys 1 -yjm 1G -ytm 3G -yqu default -p 1
> -sae -c org.apache.flink.streaming.examples.socket.SocketWindowWordCount
> ./examples/streaming/SocketWindowWordCount.jar -hostname 192.168.2.111
>  -port 
> ```
>
> The logs are printed as follows at 1.17.1 version:
> ```
> 2024-03-05 14:43:08,144 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
> TumblingProcessingTimeWindows -> Sink: Print to Std. Out (1/1)
> (a23ddaf520a680f213db5726192b7dc4_90bea66de1c231edf33913ecd54406c1_0_0)
> switched from INITIALIZING to RUNNING. 2024-03-05 14:43:29,232 ERROR
> org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] -
> Exception occurred in REST handler: Request did not match expected format
> JobClientHeartbeatRequestBody. 2024-03-05 14:43:59,222 ERROR
> org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] -
> Exception occurred in REST handler: Request did not match expected format
> JobClientHeartbeatRequestBody. 2024-03-05 14:44:29,226 ERROR
> org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] -
> Exception occurred in REST handler: Request did not match expected format
> JobClientHeartbeatRequestBody. 2024-03-05 14:44:59,218 ERROR
> org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] -
> Exception occurred in REST handler: Request did not match expected format
> JobClientHeartbeatRequestBody. 2024-03-05 14:45:29,216 ERROR
> org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] -
> Exception occurred in REST handler: Request did not match expected format
> JobClientHeartbeatRequestBody. 2024-03-05 14:45:59,217 ERROR
> org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] -
> Exception occurred in REST handler: Request did not match expected format
> JobClientHeartbeatRequestBody. 2024-03-05 14:46:29,217 ERROR
> org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] -
> Exception occurred in REST handler: Request did not match expected format
> JobClientHeartbeatRequestBody. 2024-03-05 14:46:58,363 WARN
>  org.apache.flink.runtime.dispatcher.MiniDispatcher   [] - The
> heartbeat from the job client is timeout and cancel the job
> cd6e02e2d60ea07a21e2809000e078cb. You can adjust the heartbeat interval by
> 'client.heartbeat.interval' and the timeout by 'client.heartbeat.timeout'
> ```
>
> I use hadoop version 3.1.1
>
>


I used the yarn per job mode to submit tasks, which will end in 4 minutes

2024-03-05 Thread 程意
In versions 1.17.1 and 1.18.1, I used the yarn per job mode to submit tasks, 
which will end in 4 minutes.  But I tried it on Flink 1.13.1, 1.15.2, and 
1.16.3, all of which were normal.
command line at 1.17.1 version:
```
./bin/flink run -t yarn-per-job -ys 1 -yjm 1G -ytm 3G -yqu default -p 1 -sae -c 
org.apache.flink.streaming.examples.socket.SocketWindowWordCount 
./examples/streaming/SocketWindowWordCount.jar -hostname 192.168.2.111  -port 

```

The logs are printed as follows at 1.17.1 version:
```
2024-03-05 14:43:08,144 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - 
TumblingProcessingTimeWindows -> Sink: Print to Std. Out (1/1) 
(a23ddaf520a680f213db5726192b7dc4_90bea66de1c231edf33913ecd54406c1_0_0) 
switched from INITIALIZING to RUNNING. 2024-03-05 14:43:29,232 ERROR 
org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - 
Exception occurred in REST handler: Request did not match expected format 
JobClientHeartbeatRequestBody. 2024-03-05 14:43:59,222 ERROR 
org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - 
Exception occurred in REST handler: Request did not match expected format 
JobClientHeartbeatRequestBody. 2024-03-05 14:44:29,226 ERROR 
org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - 
Exception occurred in REST handler: Request did not match expected format 
JobClientHeartbeatRequestBody. 2024-03-05 14:44:59,218 ERROR 
org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - 
Exception occurred in REST handler: Request did not match expected format 
JobClientHeartbeatRequestBody. 2024-03-05 14:45:29,216 ERROR 
org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - 
Exception occurred in REST handler: Request did not match expected format 
JobClientHeartbeatRequestBody. 2024-03-05 14:45:59,217 ERROR 
org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - 
Exception occurred in REST handler: Request did not match expected format 
JobClientHeartbeatRequestBody. 2024-03-05 14:46:29,217 ERROR 
org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - 
Exception occurred in REST handler: Request did not match expected format 
JobClientHeartbeatRequestBody. 2024-03-05 14:46:58,363 WARN  
org.apache.flink.runtime.dispatcher.MiniDispatcher   [] - The heartbeat 
from the job client is timeout and cancel the job 
cd6e02e2d60ea07a21e2809000e078cb. You can adjust the heartbeat interval by 
'client.heartbeat.interval' and the timeout by 'client.heartbeat.timeout'
```

I use hadoop version 3.1.1



Re: Handling late events with Table API / SQL

2024-03-05 Thread Feng Jin
You can use the  CURRENT_WATERMARK(rowtime)  function for some filtering,
please refer to [1] for details.


https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/

Best,
Feng

On Wed, Mar 6, 2024 at 1:56 AM Sunny S  wrote:

> Hi,
>
> I am using Flink SQL to create a table something like this :
>
> CREATE TABLE some-table (
> ...,
> ...,
> ...,
> ...,
> event_timestamp as TO_TIMESTAMP_LTZ(event_time*1000, 3),
> WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '30' SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'some-topic', +
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'properties.group.id' = 'testGroup',
> 'value.format' = 'csv'
> )
>
> I want to understand how can I deal with late events / out of order events
> when using Flink SQL / Table API? How can I collect the late / out of order
> events to a side output with Table API / SQL?
>
> Thanks
>


Re:Handling late events with Table API / SQL

2024-03-05 Thread Xuyang
Hi, for out of order events,  watermark can handle them. However, for late 
events, Flink Table & SQL are not 
supported to output them to a side channel like DataStream API. There have been 
some JIRAs related this.[1][2]
If you really need this feature, you may consider initiating related 
discussions in the dev mail again.
  
[1] https://issues.apache.org/jira/browse/FLINK-10031
[2] https://issues.apache.org/jira/browse/FLINK-20527




--

Best!
Xuyang




At 2024-03-06 01:55:03, "Sunny S"  wrote:

Hi,


I am using Flink SQL to create a table something like this :


CREATE TABLE some-table ( 
  ...,
  ...,
  ...,
  ...,
  event_timestamp as TO_TIMESTAMP_LTZ(event_time*1000, 3),
  WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '30' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'some-topic', +
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'value.format' = 'csv'
)


I want to understand how can I deal with late events / out of order events when 
using Flink SQL / Table API? How can I collect the late / out of order events 
to a side output with Table API / SQL?


Thanks 

Handling late events with Table API / SQL

2024-03-05 Thread Sunny S
Hi,

I am using Flink SQL to create a table something like this :

CREATE TABLE some-table (
  ...,
  ...,
  ...,
  ...,
  event_timestamp as TO_TIMESTAMP_LTZ(event_time*1000, 3),
  WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '30' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'some-topic', +
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'value.format' = 'csv'
)

I want to understand how can I deal with late events / out of order events when 
using Flink SQL / Table API? How can I collect the late / out of order events 
to a side output with Table API / SQL?

Thanks


Re: Temporal join on rolling aggregate

2024-03-05 Thread Gyula Fóra
Hi Everyone!

I have discussed this with Sébastien Chevalley, he is going to prepare and
drive the FLIP while I will assist him along the way.

Thanks
Gyula

On Tue, Mar 5, 2024 at 9:57 AM  wrote:

> I do agree with Ron Liu.
> This would definitely need a FLIP as it would impact SQL and extend it
> with the equivalent of TimestampAssigners in the Java API.
>
> Is there any existing JIRA here, or is anybody willing to drive a FLIP?
> On Feb 26, 2024 at 02:36 +0100, Ron liu , wrote:
>
> +1,
> But I think this should be a more general requirement, that is, support for
> declaring watermarks in query, which can be declared for any type of
> source, such as table, view. Similar to databricks provided [1], this needs
> a FLIP.
>
> [1]
>
> https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-qry-select-watermark.html
>
> Best,
> Ron
>
>


Re: Temporal join on rolling aggregate

2024-03-05 Thread lorenzo.affetti.ververica.com via user
I do agree with Ron Liu.
This would definitely need a FLIP as it would impact SQL and extend it with the 
equivalent of TimestampAssigners in the Java API.

Is there any existing JIRA here, or is anybody willing to drive a FLIP?
On Feb 26, 2024 at 02:36 +0100, Ron liu , wrote:
> +1,
> But I think this should be a more general requirement, that is, support for
> declaring watermarks in query, which can be declared for any type of
> source, such as table, view. Similar to databricks provided [1], this needs
> a FLIP.
>
> [1]
> https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-qry-select-watermark.html
>
> Best,
> Ron


Re: Batch mode execution

2024-03-04 Thread irakli.keshel...@sony.com
Thank you both! I'll try to switch the scheduler to "AdaptiveBatchScheduler".

Best,
Irakli

From: Junrui Lee 
Sent: 05 March 2024 03:50
To: user 
Subject: Re: Batch mode execution

Hello Irakli,

The error is due to the fact that the Adaptive Scheduler doesn’t support batch 
jobs, as detailed in the Flink documentation[1]. When operating in reactive 
mode, Flink automatically decides the type of scheduler to use. For batch 
execution, the default scheduler is AdaptiveBatchScheduler, not 
AdaptiveScheduler as in the streaming case.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#limitations

Best regards,
Junrui

lorenzo.affetti.ververica.com via user 
mailto:user@flink.apache.org>> 于2024年3月4日周一 23:24写道:
Hello Irakli and thank you for your question.

I guess that somehow Flink enters the "reactive" mode while the adaptive 
scheduler is not configured.

I would go with 2 options to isolate your issue:

  *   Try with forcing the scheduling mode 
(https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/elastic_scaling/#adaptive-scheduler)
 in your configuration: `jobmanager.scheduler: adaptive`
  *Let Flink decide for the runtime mode: if the source is bounded, you 
don't need `env.setRuntimeMode(RuntimeExecutionMode.BATCH)`, as Flink will 
understand that correctly.

Can you try one of the two and see if that helps?

For the rest: "running it in the "BATCH" mode was better as I don't have to 
deal with the Watermark Strategy". Still, you could opt for a simple 
watermarking strategy and stay with the streaming mode (for example, 
'BoundedOutOfOrcerness': 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector).
On Mar 4, 2024 at 15:54 +0100, 
irakli.keshel...@sony.com 
mailto:irakli.keshel...@sony.com>>, wrote:
Hello,

I have a Flink job which is processing bounded number of events. Initially, I 
was running the job in the "STREAMING" mode, but I realized that running it in 
the "BATCH" mode was better as I don't have to deal with the Watermark 
Strategy. The job is reading the data from the Kafka topic and was running fine 
in the "STREAMING" mode.
I switched the job to the "BATCH" mode by setting 
"env.setRuntimeMode(RuntimeExecutionMode.BATCH)". I changed the Kafka Source to 
be bounded by setting ".setBounded(OffsetsInitializer.latest())" to the source 
builder and I updated the Watermark strategy to be 
"WatermarkStrategy.noWatermarks()".
After making these changes and deploying the job I end up with following error: 
"java.lang.IllegalStateException","error.message":"Adaptive Scheduler is 
required for reactive mode". I couldn't find any documentation online which is 
connecting "Adaptive Scheduler" to the "BATCH" processing. Does anyone know 
where this error is coming from and how I can deal with it?

Cheers,
Irakli


Re: 退订

2024-03-04 Thread Shawn Huang
Hi,退订可以发送任意内容的邮件到  user-zh-unsubscr...@flink.apache.org  来取消订阅来自
user-zh@flink.apache.org  邮件列表的邮件,邮件列表的订阅管理,可以参考[1]

[1] https://flink.apache.org/zh/what-is-flink/community/

Best,
Shawn Huang


雷刚  于2024年2月29日周四 14:41写道:

> 退订


Re: flink sql作业如何统计端到端延迟

2024-03-04 Thread Shawn Huang
Flink有一个端到端延迟的指标,可以参考以下文档[1],看看是否有帮助。

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/ops/metrics/#end-to-end-latency-tracking

Best,
Shawn Huang


casel.chen  于2024年2月21日周三 15:31写道:

> flink sql作业从kafka消费mysql过来的canal
> json消息,经过复杂处理后写入doris,请问如何统计doris表记录的端到端时延?mysql表有update_time字段代表业务更新记录时间。
> doris系统可以在表schema新增一个更新时间列ingest_time,所以在doris表上可以通过ingest_time -
> update_time算出端到端时延,但这种方法只能离线统计,有没有实时统计以方便实时监控的方法?
>
> 查了SinkFunction类的invoke方法虽然带有Context类型参数可以获取当前处理时间和事件时间,但因为大部分sink都是采用攒微批方式再批量写入的,所以这两个时间直接相减得到的时间差并不能代表真实落库的时延。有没有精确获取时延的方法呢?


Re: Question about time-based operators with RocksDB backend

2024-03-04 Thread Zakelly Lan
Hi Gabriele,

Quick answer: You can use the built-in window operators which have been
integrated with state backends including RocksDB.


Thanks,
Zakelly

On Tue, Mar 5, 2024 at 10:33 AM Zhanghao Chen 
wrote:

> Hi Gabriele,
>
> I'd recommend extending the existing window function whenever possible, as
> Flink will automatically cover state management for you and no need to be
> concerned with state backend details. Incremental aggregation for reduce
> state size is also out of the box if your usage can be satisfied with the
> reduce/aggregate function pattern, which is important for large windows.
>
> Best,
> Zhanghao Chen
> --
> *From:* Gabriele Mencagli 
> *Sent:* Monday, March 4, 2024 19:38
> *To:* user@flink.apache.org 
> *Subject:* Question about time-based operators with RocksDB backend
>
>
> Dear Flink Community,
>
> I am using Flink with the DataStream API and operators implemented using
> RichedFunctions. I know that Flink provides a set of window-based operators
> with time-based semantics and tumbling/sliding windows.
>
> By reading the Flink documentation, I understand that there is the
> possibility to change the memory backend utilized for storing the in-flight
> state of the operators. For example, using RocksDB for this purpose to cope
> with a larger-than-memory state. If I am not wrong, to transparently change
> the backend (e.g., from in-memory to RocksDB) we have to use a proper API
> to access the state. For example, the Keyed State API with different
> abstractions such as ValueState, ListState, etc... as reported here
> 
> .
>
> My question is related to the utilization of time-based window operators
> with the RocksDB backend. Suppose for example very large temporal windows
> with a huge number of keys in the stream. I am wondering if there is a
> possibility to use the built-in window operators of Flink (e.g., with an
> AggregateFunction or a more generic ProcessWindowFunction as here
> )
> transparently with RocksDB support as a state back-end, or if I have to
> develop the window operator in a raw manner using the Keyed State API
> (e.g., ListState, AggregateState) for this purpose by implementing the
> underlying window logic manually in the code of RichedFunction of the
> operator (e.g., a FlatMap).
> Thanks for your support,
>
> --
> Gabriele Mencagli
>
>


Re: Batch mode execution

2024-03-04 Thread Junrui Lee
Hello Irakli,

The error is due to the fact that the Adaptive Scheduler doesn’t support
batch jobs, as detailed in the Flink documentation[1]. When operating in
reactive mode, Flink automatically decides the type of scheduler to use.
For batch execution, the default scheduler is AdaptiveBatchScheduler, not
AdaptiveScheduler as in the streaming case.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#limitations

Best regards,
Junrui

lorenzo.affetti.ververica.com via user  于2024年3月4日周一
23:24写道:

> Hello Irakli and thank you for your question.
>
> I guess that somehow Flink enters the "reactive" mode while the adaptive
> scheduler is not configured.
>
> I would go with 2 options to isolate your issue:
>
>- Try with forcing the scheduling mode (
>
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/elastic_scaling/#adaptive-scheduler)
>in your configuration: `jobmanager.scheduler: adaptive`
>-  Let Flink decide for the runtime mode: if the source is bounded,
>you don't need `env.setRuntimeMode(RuntimeExecutionMode.BATCH)`, as Flink
>will understand that correctly.
>
>
> Can you try one of the two and see if that helps?
>
> For the rest: "running it in the "BATCH" mode was better as I don't have
> to deal with the Watermark Strategy". Still, you could opt for a simple
> watermarking strategy and stay with the streaming mode (for example,
> 'BoundedOutOfOrcerness':
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
> ).
> On Mar 4, 2024 at 15:54 +0100, irakli.keshel...@sony.com <
> irakli.keshel...@sony.com>, wrote:
>
> Hello,
>
> I have a Flink job which is processing bounded number of events.
> Initially, I was running the job in the "STREAMING" mode, but I realized
> that running it in the "BATCH" mode was better as I don't have to deal with
> the Watermark Strategy. The job is reading the data from the Kafka topic
> and was running fine in the "STREAMING" mode.
> I switched the job to the "BATCH" mode by setting
> "env.setRuntimeMode(RuntimeExecutionMode.BATCH)". I changed the Kafka
> Source to be bounded by setting ".setBounded(OffsetsInitializer.latest())"
> to the source builder and I updated the Watermark strategy to be
> "WatermarkStrategy.noWatermarks()".
> After making these changes and deploying the job I end up with following
> error: "java.lang.IllegalStateException","error.message":"Adaptive
> Scheduler is required for reactive mode". I couldn't find any documentation
> online which is connecting "Adaptive Scheduler" to the "BATCH" processing.
> Does anyone know where this error is coming from and how I can deal with it?
>
> Cheers,
> Irakli
>
>


Re: Question about time-based operators with RocksDB backend

2024-03-04 Thread Zhanghao Chen
Hi Gabriele,

I'd recommend extending the existing window function whenever possible, as 
Flink will automatically cover state management for you and no need to be 
concerned with state backend details. Incremental aggregation for reduce state 
size is also out of the box if your usage can be satisfied with the 
reduce/aggregate function pattern, which is important for large windows.

Best,
Zhanghao Chen

From: Gabriele Mencagli 
Sent: Monday, March 4, 2024 19:38
To: user@flink.apache.org 
Subject: Question about time-based operators with RocksDB backend


Dear Flink Community,

I am using Flink with the DataStream API and operators implemented using 
RichedFunctions. I know that Flink provides a set of window-based operators 
with time-based semantics and tumbling/sliding windows.

By reading the Flink documentation, I understand that there is the possibility 
to change the memory backend utilized for storing the in-flight state of the 
operators. For example, using RocksDB for this purpose to cope with a 
larger-than-memory state. If I am not wrong, to transparently change the 
backend (e.g., from in-memory to RocksDB) we have to use a proper API to access 
the state. For example, the Keyed State API with different abstractions such as 
ValueState, ListState, etc... as reported 
here.

My question is related to the utilization of time-based window operators with 
the RocksDB backend. Suppose for example very large temporal windows with a 
huge number of keys in the stream. I am wondering if there is a possibility to 
use the built-in window operators of Flink (e.g., with an AggregateFunction or 
a more generic ProcessWindowFunction as 
here)
 transparently with RocksDB support as a state back-end, or if I have to 
develop the window operator in a raw manner using the Keyed State API (e.g., 
ListState, AggregateState) for this purpose by implementing the underlying 
window logic manually in the code of RichedFunction of the operator (e.g., a 
FlatMap).

Thanks for your support,

--
Gabriele Mencagli


Re:Table中的java.util.Date类型对应sql中的什么类型

2024-03-04 Thread Xuyang
Hi, 
java.util.Date没有sql中的常规类型和它对应,因此使用的兜底的Raw类型(结构化类型)。实际上java.sql.Date 
对应的是sql中的Date。
具体可以参考下这张表:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/#data-type-extraction




--

Best!
Xuyang





在 2024-03-05 09:23:38,"ha.fen...@aisino.com"  写道:
>从流转换成Table
>DataStream streamSource = env.addSource(new OrdersSourceObject());
>Table table = 
>tEnv.fromDataStream(streamSource).select($("addtime"),$("cusname"),$("price"),$("status"));
>tEnv.createTemporaryView("itemtable",table);
>
>Orders定义
>private Date addtime;
>private String cusname;
>private BigDecimal price;
>private int status;
>
>输出到kafka
>String creatDDL = "CREATE TABLE kafka_sink (\n" +
>"  `addtime` TIMESTAMP(0),\n" +
>"  `cusname` STRING,\n" +
>"  `price` DECIMAL(15, 2),\n" +
>"  `status` INT\n" +
>") WITH (\n" +
>"  'connector' = 'kafka',\n" +
>"  'format' = 'json'\n" +
>.
>")";
>
>String query = "INSERT INTO kafka_sink SELECT * FROM itemtable;";
>tEnv.executeSql(query);
>
>报错
>
>Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>Column types of query result and sink for 
>'default_catalog.default_database.kafka_sink' do not match.
>Cause: Incompatible types for sink column 'addtime' at position 0.
>
>Query schema: [addtime: RAW('java.util.Date', '...'), cusname: STRING, price: 
>DECIMAL(38, 18), status: INT NOT NULL]
>Sink schema:  [addtime: TIMESTAMP(0), cusname: STRING, price: DECIMAL(15, 2), 
>status: INT]


Re: Batch mode execution

2024-03-04 Thread lorenzo.affetti.ververica.com via user
Hello Irakli and thank you for your question.

I guess that somehow Flink enters the "reactive" mode while the adaptive 
scheduler is not configured.

I would go with 2 options to isolate your issue:

• Try with forcing the scheduling mode 
(https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/elastic_scaling/#adaptive-scheduler)
 in your configuration: `jobmanager.scheduler: adaptive`
•  Let Flink decide for the runtime mode: if the source is bounded, you don't 
need `env.setRuntimeMode(RuntimeExecutionMode.BATCH)`, as Flink will understand 
that correctly.


Can you try one of the two and see if that helps?

For the rest: "running it in the "BATCH" mode was better as I don't have to 
deal with the Watermark Strategy". Still, you could opt for a simple 
watermarking strategy and stay with the streaming mode (for example, 
'BoundedOutOfOrcerness': 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector).
On Mar 4, 2024 at 15:54 +0100, irakli.keshel...@sony.com 
, wrote:
> Hello,
>
> I have a Flink job which is processing bounded number of events. Initially, I 
> was running the job in the "STREAMING" mode, but I realized that running it 
> in the "BATCH" mode was better as I don't have to deal with the Watermark 
> Strategy. The job is reading the data from the Kafka topic and was running 
> fine in the "STREAMING" mode.
> I switched the job to the "BATCH" mode by setting 
> "env.setRuntimeMode(RuntimeExecutionMode.BATCH)". I changed the Kafka Source 
> to be bounded by setting ".setBounded(OffsetsInitializer.latest())" to the 
> source builder and I updated the Watermark strategy to be 
> "WatermarkStrategy.noWatermarks()".
> After making these changes and deploying the job I end up with following 
> error: "java.lang.IllegalStateException","error.message":"Adaptive Scheduler 
> is required for reactive mode". I couldn't find any documentation online 
> which is connecting "Adaptive Scheduler" to the "BATCH" processing. Does 
> anyone know where this error is coming from and how I can deal with it?
>
> Cheers,
> Irakli


Batch mode execution

2024-03-04 Thread irakli.keshel...@sony.com
Hello,

I have a Flink job which is processing bounded number of events. Initially, I 
was running the job in the "STREAMING" mode, but I realized that running it in 
the "BATCH" mode was better as I don't have to deal with the Watermark 
Strategy. The job is reading the data from the Kafka topic and was running fine 
in the "STREAMING" mode.
I switched the job to the "BATCH" mode by setting 
"env.setRuntimeMode(RuntimeExecutionMode.BATCH)". I changed the Kafka Source to 
be bounded by setting ".setBounded(OffsetsInitializer.latest())" to the source 
builder and I updated the Watermark strategy to be 
"WatermarkStrategy.noWatermarks()".
After making these changes and deploying the job I end up with following error: 
"java.lang.IllegalStateException","error.message":"Adaptive Scheduler is 
required for reactive mode". I couldn't find any documentation online which is 
connecting "Adaptive Scheduler" to the "BATCH" processing. Does anyone know 
where this error is coming from and how I can deal with it?

Cheers,
Irakli


Re: flink-operator-1.5.0 supports which versions of Kubernetes

2024-03-04 Thread Gyula Fóra
It should be compatible. There is no compatibility matrix but it is
compatible with most versions that are in use (at the different
companies/users etc)

Gyula

On Thu, Feb 29, 2024 at 6:21 AM 吴圣运  wrote:

> Hi,
>
> I'm using flink-operator-1.5.0 and I need to deploy it to Kubernetes 1.20.
> I want to confirm if this version of flink-operator is compatible with
> Kubernetes 1.20. I cannot find the compatible matrix in the github page.
> Could you help me confirm?
>
> Thanks
> shengyun.wu
>


Question about time-based operators with RocksDB backend

2024-03-04 Thread Gabriele Mencagli

Dear Flink Community,

I am using Flink with the DataStream API and operators implemented using 
RichedFunctions. I know that Flink provides a set of window-based 
operators with time-based semantics and tumbling/sliding windows.


By reading the Flink documentation, I understand that there is the 
possibility to change the memory backend utilized for storing the 
in-flight state of the operators. For example, using RocksDB for this 
purpose to cope with a larger-than-memory state. If I am not wrong, to 
transparently change the backend (e.g., from in-memory to RocksDB) we 
have to use a proper API to access the state. For example, the Keyed 
State API with different abstractions such as ValueState, 
ListState, etc... as reported here 
.


My question is related to the utilization of time-based window operators 
with the RocksDB backend. Suppose for example very large temporal 
windows with a huge number of keys in the stream. I am wondering if 
there is a possibility to use the built-in window operators of Flink 
(e.g., with an AggregateFunction or a more generic ProcessWindowFunction 
as here 
) 
transparently with RocksDB support as a state back-end, or if I have to 
develop the window operator in a raw manner using the Keyed State API 
(e.g., ListState, AggregateState) for this purpose by implementing the 
underlying window logic manually in the code of RichedFunction of the 
operator (e.g., a FlatMap).


Thanks for your support,

--
Gabriele Mencagli


Re: Support for ConfigMap for Runtime Arguments in Flink Kubernetes Operator

2024-03-04 Thread Surendra Singh Lilhore
Hi Arjun,

I have raised a Jira for this case and attached a patch:

https://issues.apache.org/jira/browse/FLINK-34565

-Surendra

On Wed, Feb 21, 2024 at 12:48 AM Surendra Singh Lilhore <
surendralilh...@apache.org> wrote:

> Hi Arjun,
>
> Yes, direct support for external configuration files within Flink
> ConfigMaps is somewhat restricted. The current method involves simply
> copying two local files from the operator.
>
> Please check : FlinkConfMountDecorator#getLocalLogConfFiles()
> 
>
> You can try a pod template for external configmap.
>
> Thanks
> Surendra
>
> On Mon, Feb 19, 2024 at 11:17 PM arjun s  wrote:
>
>> Hi team,
>>
>> I am currently in the process of deploying Flink on Kubernetes using the
>> Flink Kubernetes Operator and have encountered a scenario where I need to
>> pass runtime arguments to my Flink application from a properties file.
>> Given the dynamic nature of Kubernetes environments and the need for
>> flexibility in configuration management, I was wondering if the Flink
>> Kubernetes Operator supports the use of Kubernetes ConfigMaps for this
>> purpose. Specifically, I am interested in understanding:
>>
>> 1.How can I use a ConfigMap to pass runtime arguments or configurations
>> stored in a properties file to a Flink job deployed using the Kubernetes
>> operator?
>> 2.Are there best practices or recommended approaches for managing
>> application-specific configurations, such as database connections or other
>> external resource settings, using ConfigMaps with the Flink Kubernetes
>> Operator?
>> 3.If direct support for ConfigMaps is not available or limited, could you
>> suggest any workarounds or alternative strategies that align with Flink's
>> deployment model on Kubernetes?
>>
>> I appreciate any guidance or documentation you could provide on this
>> matter, as it would greatly assist in streamlining our deployment process
>> and maintaining configuration flexibility in our Flink applications.
>>
>> Thank you for your time and support. I look forward to your response.
>>
>


Re: 根据flink job web url可以获取到JobGraph信息么?

2024-03-03 Thread Zhanghao Chen
我在 Yanquan 的回答基础上补充下,通过 /jobs/:jobid/plan 实际上拿到的就是 JSON 表示的 JobGraph 信息(通过 
JsonPlanGenerator 这个类生成,包含了绝大部分 jobgraph 里常用的信息),应该能满足你的需要

From: casel.chen 
Sent: Saturday, March 2, 2024 14:17
To: user-zh@flink.apache.org 
Subject: 根据flink job web url可以获取到JobGraph信息么?

正在运行的flink作业能够通过其对外暴露的web url获取到JobGraph信息么?


Re: 退订

2024-03-03 Thread Hang Ruan
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅来自
 user-zh-unsubscr...@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。

Best,
Hang

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

4kings...@gmail.com <4kings...@gmail.com> 于2024年3月2日周六 19:24写道:

> 退订
> 4kings...@gmail.com
> 邮箱:4kings...@gmail.com


退订

2024-03-02 Thread 4kings...@gmail.com
退订
4kings...@gmail.com
邮箱:4kings...@gmail.com

Re: 根据flink job web url可以获取到JobGraph信息么?

2024-03-01 Thread Yanquan Lv
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid-plan
通过 /jobs/:jobid/plan  能获得 ExecutionGraph 的信息,不知道能不能包含你需要的信息。

casel.chen  于2024年3月2日周六 14:19写道:

> 正在运行的flink作业能够通过其对外暴露的web url获取到JobGraph信息么?


根据flink job web url可以获取到JobGraph信息么?

2024-03-01 Thread casel.chen
正在运行的flink作业能够通过其对外暴露的web url获取到JobGraph信息么?

Re: mysql cdc streamapi与sqlapi 输出表现不相同

2024-03-01 Thread Feng Jin
这两个 print 的实现是不一样的。

 dataStream().print 是增加的 PrintSinkFunction, 该算子接受到数据会立刻打印出来, 且结果是在 TM 上打印出来。

 而 table.execute().print() 是会把最终的结果通过 collect_sink 收集之后,回传到 client, 结果是在
client 的 stdout 打印出来, 且只有在做 checkpoint 时才会回传至 client,
它的可见周期会受限于 checkpoint 的间隔。


Best,
Feng Jin

On Fri, Mar 1, 2024 at 4:45 PM ha.fen...@aisino.com 
wrote:

> sink中只是打印
>
> streamapi,checkpoint设置的精准一次
> env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL
> Source").print();
> 数据库改变数据后,立即就可以在控制台打印出来。
>
> sqlapi,checkpoint设置的精准一次
> Table custab = tEnv.sqlQuery("select * from orders ");
> custab.execute().print();
> 数据库改变不会立即打印,等到checkpoint打印时才会把改变的数据打印出来。并且刚启动程序的时候,打印历史数据也是在checkpoint后才打印。
> 16:39:17,109 INFO
> com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader [] -
> Binlog offset on checkpoint 1: {ts_sec=0, file=mysql-bin.46, pos=11653,
> kind=SPECIFIC, gtids=, row=0, event=0}
> 16:39:17,231 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed
> checkpoint 1 for job 5bf08275f1992d1f7997fc8f7c32b6b1 (4268 bytes,
> checkpointDuration=218 ms, finalizationTime=6 ms).
> 16:39:17,241 INFO
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking
> checkpoint 1 as completed for source Source: orders[1].
>
> ++-+-++---+-+
> | op |  id | addtime |cusname
> | price |  status |
>
> ++-+-++---+-+
> | +I | 616 | 2024-02-22 16:23:11 |   name
> |  3.23 |   7 |
> | +I | 617 | 2024-03-01 11:42:03 |   name
> |  1.11 |   9 |
> | +I | 612 | 2024-01-31 13:53:49 |   name
> |  1.29 |   1 |
>
> 这是什么原因?
>


Re: flink cdc底层的debezium是如何注册schema到confluent schema registry的?

2024-02-29 Thread Hang Ruan
Hi,casel.chen。

这个部分应该是在 CDC 项目里没有涉及到,CDC 依赖 debezium 的 engine 部分直接读取出变更数据,并没有像 debezium
本身一样去写入到 Kafka 中。
可以考虑去 Debezium 社区咨询一下这部分的内容,Debezium开发者们应该更熟悉这部分的内容。

祝好,
Hang

casel.chen  于2024年2月29日周四 18:11写道:

> 搜索了debezium源码但没有发现哪里有调用
> SchemaRegistryClient.register方法的地方,请问它是如何注册schema到confluent schema
> registry的?


Re: mysql cdc streamapi与sqlapi 输出表现不相同

2024-02-29 Thread Hang Ruan
你好,ha.fengqi。

MySQL CDC
连接器只有在多并发时,会依赖checkpoint的完成来切换到增量阶段。从你提供的代码上来看,是单并发的运行作业,所以应该Source
在这两者之间的行为不会有区别。
这个不同是不是有可能是下游在两种使用方式上,有什么区别?
可以通过观察具体的IO指标看到Source是否真的及时发出消息,如果比较熟悉代码,也可以自己添加一下打印日志来验证。

祝好,
Hang


[ANNOUNCE] Apache flink-connector-parent 1.1.0 released

2024-02-29 Thread Etienne Chauchot
The Apache Flink community is very happy to announce the release of 
Apache flink-connector-parent 1.1.0.


Apache Flink® is an open-source stream processing framework for 
distributed, high-performing, always-available, and accurate data 
streaming applications.


The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353442

We would like to thank all contributors of the Apache Flink community 
who made this release possible!


Regards,
Etienne Chauchot


flink cdc底层的debezium是如何注册schema到confluent schema registry的?

2024-02-29 Thread casel.chen
搜索了debezium源码但没有发现哪里有调用 
SchemaRegistryClient.register方法的地方,请问它是如何注册schema到confluent schema registry的?

flink-operator-1.5.0 supports which versions of Kubernetes

2024-02-28 Thread 吴圣运
Hi,

I'm using flink-operator-1.5.0 and I need to deploy it to Kubernetes 1.20. I 
want to confirm if this version of flink-operator is compatible with Kubernetes 
1.20. I cannot find the compatible matrix in the github page. Could you help me 
confirm?

Thanks 
shengyun.wu

Re: 退订

2024-02-28 Thread Shawn Huang
Hi,退订可以发送任意内容的邮件到  user-zh-unsubscr...@flink.apache.org  来取消订阅来自
user-zh@flink.apache.org  邮件列表的邮件,邮件列表的订阅管理,可以参考[1]

[1] https://flink.apache.org/zh/what-is-flink/community/

Best,
Shawn Huang


18679131354 <18679131...@163.com> 于2024年2月27日周二 14:32写道:

> 退订


Re: flink重启机制

2024-02-27 Thread Yanquan Lv
图片没有显示出来。container 调度是由 yarn 控制的,yarn 会优先选择运行中的节点。按理说 container
不会调度到下线的节点,你通过 yarn web 或者 yarn node -list 确认了吗?

chenyu_opensource  于2024年2月27日周二 18:30写道:

> 你好,flink任务提交到yarn上,由于某个节点下线导致flink任务失败,如下:
>
> 同时重试超过次数,任务失败,如下图:
>
> 我想问一下,flink重试机制中
> 任务不会重新调度到新节点的container吗?为什么一直在同一个节点从而导致整体任务失败。这个调度是由yarn控制还是flink自身代码控制的?如有相关代码也请告知,谢谢。
>
> 期待回复,谢谢!
>


flink重启机制

2024-02-27 Thread chenyu_opensource
你好,flink任务提交到yarn上,由于某个节点下线导致flink任务失败,如下:


同时重试超过次数,任务失败,如下图:


我想问一下,flink重试机制中 
任务不会重新调度到新节点的container吗?为什么一直在同一个节点从而导致整体任务失败。这个调度是由yarn控制还是flink自身代码控制的?如有相关代码也请告知,谢谢。


期待回复,谢谢!

退订

2024-02-26 Thread 18679131354
退订

来自杨作青的邮件

2024-02-26 Thread 杨作青
退订

Re: Schema Evolution & Json Schemas

2024-02-26 Thread Salva Alcántara
Awesome Andrew, thanks a lot for the info!

On Sun, Feb 25, 2024 at 4:37 PM Andrew Otto  wrote:

> >  the following code generator
> Oh, and FWIW we avoid code generation and POJOs, and instead rely on
> Flink's Row or RowData abstractions.
>
>
>
>
>
> On Sun, Feb 25, 2024 at 10:35 AM Andrew Otto  wrote:
>
>> Hi!
>>
>> I'm not sure if this totally is relevant for you, but we use JSONSchema
>> and JSON with Flink at the Wikimedia Foundation.
>> We explicitly disallow the use of additionalProperties
>> ,
>> unless it is to define Map type fields
>> 
>> (where additionalProperties itself is a schema).
>>
>> We have JSONSchema converters and JSON Serdes to be able to use our
>> JSONSchemas and JSON records with both the DataStream API (as Row) and
>> Table API (as RowData).
>>
>> See:
>> -
>> https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json
>> -
>> https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/#managing-a-object
>>
>> State schema evolution is supported via the EventRowTypeInfo wrapper
>> 
>> .
>>
>> Less directly about Flink: I gave a talk at Confluent's Current conf in
>> 2022 about why we use JSONSchema
>> .
>> See also this blog post series if you are interested
>> 
>> !
>>
>> -Andrew Otto
>>  Wikimedia Foundation
>>
>>
>> On Fri, Feb 23, 2024 at 1:58 AM Salva Alcántara 
>> wrote:
>>
>>> I'm facing some issues related to schema evolution in combination with
>>> the usage of Json Schemas and I was just wondering whether there are any
>>> recommended best practices.
>>>
>>> In particular, I'm using the following code generator:
>>>
>>> - https://github.com/joelittlejohn/jsonschema2pojo
>>>
>>> Main gotchas so far relate to the `additionalProperties` field. When
>>> setting that to true, the resulting POJO is not valid according to Flink
>>> rules because the generated getter/setter methods don't follow the java
>>> beans naming conventions, e.g., see here:
>>>
>>> - https://github.com/joelittlejohn/jsonschema2pojo/issues/1589
>>>
>>> This means that the Kryo fallback is used for serialization purposes,
>>> which is not only bad for performance but also breaks state schema
>>> evolution.
>>>
>>> So, because of that, setting `additionalProperties` to `false` looks
>>> like a good idea but then your job will break if an upstream/producer
>>> service adds a property to the messages you are reading. To solve this
>>> problem, the POJOs for your job (as a reader) can be generated to ignore
>>> the `additionalProperties` field (via the `@JsonIgnore` Jackson
>>> annotation). This seems to be a good overall solution to the problem, but
>>> looks a bit convoluted to me / didn't come without some trial & error (=
>>> pain & frustration).
>>>
>>> Is there anyone here facing similar issues? It would be good to hear
>>> your thoughts on this!
>>>
>>> BTW, this is very interesting article that touches on the above
>>> mentioned difficulties:
>>> -
>>> https://www.creekservice.org/articles/2024/01/09/json-schema-evolution-part-2.html
>>>
>>>
>>>


Re: Flink DataStream 作业如何获取到作业血缘?

2024-02-26 Thread Feng Jin
通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris
Sink,之后再通过反射获取里面的 properties 信息进行提取。

可以参考 OpenLineage[1] 的实现.


1.
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java


Best,
Feng


On Mon, Feb 26, 2024 at 6:20 PM casel.chen  wrote:

> 一个Flink DataStream 作业从mysql cdc消费处理后写入apache
> doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink
> connector信息,包括连接字符串、数据库名、表名等?


Flink DataStream 作业如何获取到作业血缘?

2024-02-26 Thread casel.chen
一个Flink DataStream 作业从mysql cdc消费处理后写入apache 
doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink connector信息,包括连接字符串、数据库名、表名等?

Re: Flink Scala Positions in India or USA !

2024-02-26 Thread Martijn Visser
Hi,

Please don't use the mailing list for this purpose.

Best regards,

Martijn

On Wed, Feb 21, 2024 at 4:08 PM sri hari kali charan Tummala
 wrote:
>
> Hi Folks,
>
> I am currently seeking full-time positions in Flink Scala in India or the USA 
> (non consulting) , specifically at the Principal or Staff level positions in 
> India or USA.
>
> I require an h1b transfer and assistance with relocation from India , my i40 
> is approved.
>
> Thanks & Regards
> Sri Tummala
>


Re: Temporal join on rolling aggregate

2024-02-25 Thread Ron liu
+1,
But I think this should be a more general requirement, that is, support for
declaring watermarks in query, which can be declared for any type of
source, such as table, view. Similar to databricks provided [1], this needs
a FLIP.

[1]
https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-qry-select-watermark.html

Best,
Ron


Re: Schema Evolution & Json Schemas

2024-02-25 Thread Andrew Otto
>  the following code generator
Oh, and FWIW we avoid code generation and POJOs, and instead rely on
Flink's Row or RowData abstractions.





On Sun, Feb 25, 2024 at 10:35 AM Andrew Otto  wrote:

> Hi!
>
> I'm not sure if this totally is relevant for you, but we use JSONSchema
> and JSON with Flink at the Wikimedia Foundation.
> We explicitly disallow the use of additionalProperties
> ,
> unless it is to define Map type fields
> 
> (where additionalProperties itself is a schema).
>
> We have JSONSchema converters and JSON Serdes to be able to use our
> JSONSchemas and JSON records with both the DataStream API (as Row) and
> Table API (as RowData).
>
> See:
> -
> https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json
> -
> https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/#managing-a-object
>
> State schema evolution is supported via the EventRowTypeInfo wrapper
> 
> .
>
> Less directly about Flink: I gave a talk at Confluent's Current conf in
> 2022 about why we use JSONSchema
> .
> See also this blog post series if you are interested
> 
> !
>
> -Andrew Otto
>  Wikimedia Foundation
>
>
> On Fri, Feb 23, 2024 at 1:58 AM Salva Alcántara 
> wrote:
>
>> I'm facing some issues related to schema evolution in combination with
>> the usage of Json Schemas and I was just wondering whether there are any
>> recommended best practices.
>>
>> In particular, I'm using the following code generator:
>>
>> - https://github.com/joelittlejohn/jsonschema2pojo
>>
>> Main gotchas so far relate to the `additionalProperties` field. When
>> setting that to true, the resulting POJO is not valid according to Flink
>> rules because the generated getter/setter methods don't follow the java
>> beans naming conventions, e.g., see here:
>>
>> - https://github.com/joelittlejohn/jsonschema2pojo/issues/1589
>>
>> This means that the Kryo fallback is used for serialization purposes,
>> which is not only bad for performance but also breaks state schema
>> evolution.
>>
>> So, because of that, setting `additionalProperties` to `false` looks like
>> a good idea but then your job will break if an upstream/producer service
>> adds a property to the messages you are reading. To solve this problem, the
>> POJOs for your job (as a reader) can be generated to ignore the
>> `additionalProperties` field (via the `@JsonIgnore` Jackson annotation).
>> This seems to be a good overall solution to the problem, but looks a bit
>> convoluted to me / didn't come without some trial & error (= pain &
>> frustration).
>>
>> Is there anyone here facing similar issues? It would be good to hear your
>> thoughts on this!
>>
>> BTW, this is very interesting article that touches on the above mentioned
>> difficulties:
>> -
>> https://www.creekservice.org/articles/2024/01/09/json-schema-evolution-part-2.html
>>
>>
>>


Re: Schema Evolution & Json Schemas

2024-02-25 Thread Andrew Otto
Hi!

I'm not sure if this totally is relevant for you, but we use JSONSchema and
JSON with Flink at the Wikimedia Foundation.
We explicitly disallow the use of additionalProperties
,
unless it is to define Map type fields

(where additionalProperties itself is a schema).

We have JSONSchema converters and JSON Serdes to be able to use our
JSONSchemas and JSON records with both the DataStream API (as Row) and
Table API (as RowData).

See:
-
https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json
-
https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/#managing-a-object

State schema evolution is supported via the EventRowTypeInfo wrapper

.

Less directly about Flink: I gave a talk at Confluent's Current conf in
2022 about why we use JSONSchema
.
See also this blog post series if you are interested

!

-Andrew Otto
 Wikimedia Foundation


On Fri, Feb 23, 2024 at 1:58 AM Salva Alcántara 
wrote:

> I'm facing some issues related to schema evolution in combination with the
> usage of Json Schemas and I was just wondering whether there are any
> recommended best practices.
>
> In particular, I'm using the following code generator:
>
> - https://github.com/joelittlejohn/jsonschema2pojo
>
> Main gotchas so far relate to the `additionalProperties` field. When
> setting that to true, the resulting POJO is not valid according to Flink
> rules because the generated getter/setter methods don't follow the java
> beans naming conventions, e.g., see here:
>
> - https://github.com/joelittlejohn/jsonschema2pojo/issues/1589
>
> This means that the Kryo fallback is used for serialization purposes,
> which is not only bad for performance but also breaks state schema
> evolution.
>
> So, because of that, setting `additionalProperties` to `false` looks like
> a good idea but then your job will break if an upstream/producer service
> adds a property to the messages you are reading. To solve this problem, the
> POJOs for your job (as a reader) can be generated to ignore the
> `additionalProperties` field (via the `@JsonIgnore` Jackson annotation).
> This seems to be a good overall solution to the problem, but looks a bit
> convoluted to me / didn't come without some trial & error (= pain &
> frustration).
>
> Is there anyone here facing similar issues? It would be good to hear your
> thoughts on this!
>
> BTW, this is very interesting article that touches on the above mentioned
> difficulties:
> -
> https://www.creekservice.org/articles/2024/01/09/json-schema-evolution-part-2.html
>
>
>


ClassNotFoundException for External Dependencies in Flink Session Cluster on Kubernetes

2024-02-24 Thread arjun s
Hello team,

I'm currently deploying a Flink session cluster on Kubernetes using the
Flink Kubernetes operator. My Flink job, which utilizes the DataStream API
for its logic, requires several external dependencies. , so I've used an
init container to copy all necessary jars to a /mnt/external-jars path,
which is mounted as expected.

To include these external jars in my job, I've added them to the
pipeline.classpaths property in my configuration as follows:

pipeline.classpaths:
file:///mnt/external-jars/mysql-connector-java-8.0.28.jar;file:///mnt/external-jars/flink-connector-kafka-1.17.2.jar;file:///mnt/external-jars/flink-shaded-guava-30.1.1-jre-16.2.jar;file:///mnt/external-jars/kafka-clients-3.4.1.jar

Despite this setup, when submitting my job, I encounter a
java.lang.ClassNotFoundException for com.mysql.jdbc.Driver. This leads me
to question:

Is my understanding correct that the pipeline.classpaths property is
intended for loading external dependencies in this manner?
Does the Flink Kubernetes operator support the pipeline.classpaths
property, or are there known limitations?
Are there any alternative approaches or workarounds within the Kubernetes
operator context to ensure external dependencies are recognized and loaded
correctly by the Flink job?
Any insights or suggestions to resolve this issue would be greatly
appreciated.

Thank you in advance for your help!


Re: Temporal join on rolling aggregate

2024-02-23 Thread Feng Jin
+1 Support this feature. There are many limitations to using time window
aggregation currently, and if we can declare watermark and time attribute
on the view, it will make it easier for us to use time windows. Similarly,
it would be very useful if the primary key could be declared in the view.

Therefore, I believe we need a FLIP to detail the design of this feature.


Best,
Feng

On Fri, Feb 23, 2024 at 2:39 PM  wrote:

> +1 for supporting defining time attributes on views.
>
> I once encountered the same problem as yours. I did some regular joins and
> lost time attribute, and hence I could no longer do window operations in
> subsequent logics. I had to output the joined view to Kafka, read from it
> again, and define watermark on the new source - a cubersome workaround.
>
> It would be more flexible if we could control time attribute / watermark
> on views, just as if it's some kind of special source.
>
> Thanks,
> Yaming
> 在 Feb 22, 2024, 7:46 PM +0800,Gyula Fóra ,写道:
> > Posting this to dev as well as it potentially has some implications on
> development effort.
> >
> > What seems to be the problem here is that we cannot control/override
> Timestamps/Watermarks/Primary key on VIEWs. It's understandable that you
> cannot create a PRIMARY KEY on the view but I think the temporal join also
> should not require the PK, should we remove this limitation?
> >
> > The general problem is the inflexibility of the timestamp/watermark
> handling on query outputs, which makes this again impossible.
> >
> > The workaround here can be to write the rolling aggregate to Kafka, read
> it back again and join with that. The fact that this workaround is possible
> actually highlights the need for more flexibility on the query/view side in
> my opinion.
> >
> > Has anyone else run into this issue and considered the proper solution
> to the problem? Feels like it must be pretty common :)
> >
> > Cheers,
> > Gyula
> >
> >
> >
> >
> > > On Wed, Feb 21, 2024 at 10:29 PM Sébastien Chevalley 
> wrote:
> > > > Hi,
> > > >
> > > > I have been trying to write a temporal join in SQL done on a rolling
> aggregate view. However it does not work and throws :
> > > >
> > > > org.apache.flink.table.api.ValidationException: Event-Time Temporal
> Table Join requires both primary key and row time attribute in versioned
> table, but no row time attribute can be found.
> > > >
> > > > It seems that after the aggregation, the table looses the watermark
> and it's not possible to add one with the SQL API as it's a view.
> > > >
> > > > CREATE TABLE orders (
> > > > order_id INT,
> > > > price DECIMAL(6, 2),
> > > > currency_id INT,
> > > > order_time AS NOW(),
> > > > WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND
> > > > )
> > > > WITH (
> > > > 'connector' = 'datagen',
> > > > 'rows-per-second' = '10',
> > > > 'fields.order_id.kind' = 'sequence',
> > > > 'fields.order_id.start' = '1',
> > > > 'fields.order_id.end' = '10',
> > > > 'fields.currency_id.min' = '1',
> > > > 'fields.currency_id.max' = '20'
> > > > );
> > > >
> > > > CREATE TABLE currency_rates (
> > > > currency_id INT,
> > > > conversion_rate DECIMAL(4, 3),
> > > > PRIMARY KEY (currency_id) NOT ENFORCED
> > > > )
> > > > WITH (
> > > > 'connector' = 'datagen',
> > > > 'rows-per-second' = '10',
> > > > 'fields.currency_id.min' = '1',
> > > > 'fields.currency_id.max' = '20'
> > > > );
> > > >
> > > > CREATE TEMPORARY VIEW max_rates AS (
> > > > SELECT
> > > > currency_id,
> > > > MAX(conversion_rate) AS max_rate
> > > > FROM currency_rates
> > > > GROUP BY currency_id
> > > > );
> > > >
> > > > CREATE TEMPORARY VIEW temporal_join AS (
> > > > SELECT
> > > > order_id,
> > > > max_rates.max_rate
> > > > FROM orders
> > > >  LEFT JOIN max_rates FOR SYSTEM_TIME AS OF orders.order_time
> > > >  ON orders.currency_id = max_rates.currency_id
> > > > );
> > > >
> > > > SELECT * FROM temporal_join;
> > > >
> > > > Am I missing something? What would be a good starting point to
> address this?
> > > >
> > > > Thanks in advance,
> > > > Sébastien Chevalley
>


Re: Flink Prometheus Connector问题

2024-02-23 Thread Feng Jin
我理解可以参考 FLIP 中的设计, 基于 Prometheus Remote-Write API v1.0
  来初步实现一个
SinkFunction 实现写入 Prometheus


Best,
Feng

On Fri, Feb 23, 2024 at 7:36 PM 17610775726 <17610775...@163.com> wrote:

> Hi
> 参考官网,
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/metric_reporters/#prometheuspushgateway
>
>
> Best
> JasonLee
>
>
>  回复的原邮件 
> | 发件人 | casel.chen |
> | 发送日期 | 2024年02月23日 17:35 |
> | 收件人 | user-zh@flink.apache.org |
> | 主题 | Flink Prometheus Connector问题 |
> 场景:使用Flink实时生成指标写入Prometheus进行监控告警
> 网上搜索到 https://github.com/apache/flink-connector-prometheus 项目,但内容是空的
> 另外找到FLIP-312 是关于flink prometheus connector的,
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-312%3A+Prometheus+Sink+Connector
> 请问Flink官方有没有出flink prometheus connector?
> 如果现在要实时写入prometheus的话,推荐的方式是什么?谢谢!


RE: Not all the task slots are used. Are we missing a setting somewhere?

2024-02-23 Thread Schwalbe Matthias
Thought it would be something the like 

Jean-Marc, in future, please ‘reply all’ for your answer such that the 
community can see it as well 

Welcome anyway to the community

Thias



From: Jean-Marc Paulin 
Sent: Friday, February 23, 2024 1:14 PM
To: Schwalbe Matthias 
Subject: Re: Not all the task slots are used. Are we missing a setting 
somewhere?

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Ah, found our mistake..

Yes we do set the parallelism as part of the option we pass when we submit the 
job in the first place.

Sorry for the bother  Thias.

JM


From: Schwalbe Matthias 
mailto:matthias.schwa...@viseca.ch>>
Sent: Friday, February 23, 2024 10:21
To: Jean-Marc Paulin mailto:j...@uk.ibm.com>>; 
user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: [EXTERNAL] RE: Not all the task slots are used. Are we missing a 
setting somewhere?

This Message Is From an External Sender
This message came from outside your organization.
Report 
Suspicious


Hi Jean-Marc,



In absence of more context, did you adjust the parallelism of you job 
accordingly?



Thias



From: Jean-Marc Paulin mailto:j...@uk.ibm.com>>
Sent: Friday, February 23, 2024 11:06 AM
To: user@flink.apache.org
Subject: Q: Not all the task slots are used. Are we missing a setting somewhere?



Hi,



We used to run with 3 task managers with numberOfTaskSlots = 2. So all together 
we had 6 task slots and our application used them all. Trying to increase 
throughput, we increased the number of task managers to 6. So now we have 12 
task slots all together. However our application still only uses 6 task slots, 
so we have 6 that are unused. Is there a setting I am missing somewhere ?



Thanks



JM

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.
Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


回复:Flink Prometheus Connector问题

2024-02-23 Thread 17610775726
Hi 
参考官网,https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/metric_reporters/#prometheuspushgateway


Best
JasonLee


 回复的原邮件 
| 发件人 | casel.chen |
| 发送日期 | 2024年02月23日 17:35 |
| 收件人 | user-zh@flink.apache.org |
| 主题 | Flink Prometheus Connector问题 |
场景:使用Flink实时生成指标写入Prometheus进行监控告警
网上搜索到 https://github.com/apache/flink-connector-prometheus 项目,但内容是空的
另外找到FLIP-312 是关于flink prometheus 
connector的,https://cwiki.apache.org/confluence/display/FLINK/FLIP-312%3A+Prometheus+Sink+Connector
请问Flink官方有没有出flink prometheus connector?
如果现在要实时写入prometheus的话,推荐的方式是什么?谢谢!

RE: Not all the task slots are used. Are we missing a setting somewhere?

2024-02-23 Thread Schwalbe Matthias
Hi Jean-Marc,

In absence of more context, did you adjust the parallelism of you job 
accordingly?

Thias

From: Jean-Marc Paulin 
Sent: Friday, February 23, 2024 11:06 AM
To: user@flink.apache.org
Subject: Q: Not all the task slots are used. Are we missing a setting somewhere?

Hi,

We used to run with 3 task managers with numberOfTaskSlots = 2. So all together 
we had 6 task slots and our application used them all. Trying to increase 
throughput, we increased the number of task managers to 6. So now we have 12 
task slots all together. However our application still only uses 6 task slots, 
so we have 6 that are unused. Is there a setting I am missing somewhere ?

Thanks

JM
Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


Q: Not all the task slots are used. Are we missing a setting somewhere?

2024-02-23 Thread Jean-Marc Paulin
Hi,

We used to run with 3 task managers with numberOfTaskSlots = 2. So all together 
we had 6 task slots and our application used them all. Trying to increase 
throughput, we increased the number of task managers to 6. So now we have 12 
task slots all together. However our application still only uses 6 task slots, 
so we have 6 that are unused. Is there a setting I am missing somewhere ?

Thanks

JM

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU


Flink Prometheus Connector问题

2024-02-23 Thread casel.chen
场景:使用Flink实时生成指标写入Prometheus进行监控告警
网上搜索到 https://github.com/apache/flink-connector-prometheus 项目,但内容是空的 
另外找到FLIP-312 是关于flink prometheus 
connector的,https://cwiki.apache.org/confluence/display/FLINK/FLIP-312%3A+Prometheus+Sink+Connector
请问Flink官方有没有出flink prometheus connector?
如果现在要实时写入prometheus的话,推荐的方式是什么?谢谢!

Schema Evolution & Json Schemas

2024-02-22 Thread Salva Alcántara
I'm facing some issues related to schema evolution in combination with the
usage of Json Schemas and I was just wondering whether there are any
recommended best practices.

In particular, I'm using the following code generator:

- https://github.com/joelittlejohn/jsonschema2pojo

Main gotchas so far relate to the `additionalProperties` field. When
setting that to true, the resulting POJO is not valid according to Flink
rules because the generated getter/setter methods don't follow the java
beans naming conventions, e.g., see here:

- https://github.com/joelittlejohn/jsonschema2pojo/issues/1589

This means that the Kryo fallback is used for serialization purposes, which
is not only bad for performance but also breaks state schema evolution.

So, because of that, setting `additionalProperties` to `false` looks like a
good idea but then your job will break if an upstream/producer service adds
a property to the messages you are reading. To solve this problem, the
POJOs for your job (as a reader) can be generated to ignore the
`additionalProperties` field (via the `@JsonIgnore` Jackson annotation).
This seems to be a good overall solution to the problem, but looks a bit
convoluted to me / didn't come without some trial & error (= pain &
frustration).

Is there anyone here facing similar issues? It would be good to hear your
thoughts on this!

BTW, this is very interesting article that touches on the above mentioned
difficulties:
-
https://www.creekservice.org/articles/2024/01/09/json-schema-evolution-part-2.html


Re: Temporal join on rolling aggregate

2024-02-22 Thread mayaming1983
+1 for supporting defining time attributes on views.

I once encountered the same problem as yours. I did some regular joins and lost 
time attribute, and hence I could no longer do window operations in subsequent 
logics. I had to output the joined view to Kafka, read from it again, and 
define watermark on the new source - a cubersome workaround.

It would be more flexible if we could control time attribute / watermark on 
views, just as if it's some kind of special source.

Thanks,
Yaming
在 Feb 22, 2024, 7:46 PM +0800,Gyula Fóra ,写道:
> Posting this to dev as well as it potentially has some implications on 
> development effort.
>
> What seems to be the problem here is that we cannot control/override 
> Timestamps/Watermarks/Primary key on VIEWs. It's understandable that you 
> cannot create a PRIMARY KEY on the view but I think the temporal join also 
> should not require the PK, should we remove this limitation?
>
> The general problem is the inflexibility of the timestamp/watermark handling 
> on query outputs, which makes this again impossible.
>
> The workaround here can be to write the rolling aggregate to Kafka, read it 
> back again and join with that. The fact that this workaround is possible 
> actually highlights the need for more flexibility on the query/view side in 
> my opinion.
>
> Has anyone else run into this issue and considered the proper solution to the 
> problem? Feels like it must be pretty common :)
>
> Cheers,
> Gyula
>
>
>
>
> > On Wed, Feb 21, 2024 at 10:29 PM Sébastien Chevalley  
> > wrote:
> > > Hi,
> > >
> > > I have been trying to write a temporal join in SQL done on a rolling 
> > > aggregate view. However it does not work and throws :
> > >
> > > org.apache.flink.table.api.ValidationException: Event-Time Temporal Table 
> > > Join requires both primary key and row time attribute in versioned table, 
> > > but no row time attribute can be found.
> > >
> > > It seems that after the aggregation, the table looses the watermark and 
> > > it's not possible to add one with the SQL API as it's a view.
> > >
> > > CREATE TABLE orders (
> > >     order_id INT,
> > >     price DECIMAL(6, 2),
> > >     currency_id INT,
> > >     order_time AS NOW(),
> > >     WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND
> > > )
> > >     WITH (
> > >         'connector' = 'datagen',
> > >         'rows-per-second' = '10',
> > >         'fields.order_id.kind' = 'sequence',
> > >         'fields.order_id.start' = '1',
> > >         'fields.order_id.end' = '10',
> > >         'fields.currency_id.min' = '1',
> > >         'fields.currency_id.max' = '20'
> > >     );
> > >
> > > CREATE TABLE currency_rates (
> > >     currency_id INT,
> > >     conversion_rate DECIMAL(4, 3),
> > >     PRIMARY KEY (currency_id) NOT ENFORCED
> > > )
> > >     WITH (
> > >         'connector' = 'datagen',
> > >         'rows-per-second' = '10',
> > >         'fields.currency_id.min' = '1',
> > >         'fields.currency_id.max' = '20'
> > >     );
> > >
> > > CREATE TEMPORARY VIEW max_rates AS (
> > >     SELECT
> > >         currency_id,
> > >         MAX(conversion_rate) AS max_rate
> > >     FROM currency_rates
> > >     GROUP BY currency_id
> > > );
> > >
> > > CREATE TEMPORARY VIEW temporal_join AS (
> > >     SELECT
> > >         order_id,
> > >         max_rates.max_rate
> > >     FROM orders
> > >          LEFT JOIN max_rates FOR SYSTEM_TIME AS OF orders.order_time
> > >          ON orders.currency_id = max_rates.currency_id
> > > );
> > >
> > > SELECT * FROM temporal_join;
> > >
> > > Am I missing something? What would be a good starting point to address 
> > > this?
> > >
> > > Thanks in advance,
> > > Sébastien Chevalley


Re: Using Custom JSON Formatting with Flink Operator

2024-02-22 Thread Rion Williams
Hi again Dominik,

So I was able to verify that this particular layout was being copied into
the image within the Dockerfile (specifically into
/flink/lib/log4j-layout-template-json-2.17.1.jar). Typically we've copied
over the actual jar that was built in the image to the appropriate volume
for the FlinkDeployment so that I can point to a clean location as seen
below as an initContainer for the deployment:

podTemplate:
  apiVersion: v1
  kind: Pod
  metadata:
name: task-manager-pod-template
  spec:
initContainers:
  - name: copy-job-contents
image: {{ $.Values.flinkImage }}
volumeMounts:
  - mountPath: /opt/flink/jars
name: jars
command: ["/bin/sh", "-c"]
args:
- cp /opt/flink-vvp-image/my-jar.jar /opt/flink/jars/my-jar.jar

I've tried to adjust the initContainer to copy over the jar to the lib
directory in a similar manner to be available for the operator, but I'm
still running into the same error that was previously seen via something
like:

- cp $myImage/flink/lib/log4j-layout-template-json-2.17.1.jar
$flinkImage/flink/lib/log4j-layout-template.json-2.17.1.jar

However I feel like in this case I'd need to share the lib directory for
the image running the job to copy it over from the original image. Does
that make sense? Is there a better way to handle this in this scenario?

On Thu, Feb 22, 2024 at 6:31 AM Rion Williams  wrote:

> Correct! Building a custom image for the deployment and then copying over
> the jar to a specific directory for the FlinkDeployment to use (as the
> image contains the legacy Flink jobs/jars as well as those newer ones for
> the operator).
>
> On Feb 22, 2024, at 6:18 AM, dominik.buen...@swisscom.com wrote:
>
> 
>
> Hi Rion
>
>
>
> I guess you’re building your own docker image for the deployment right?
>
>
>
> For switching to Logback I’m doing the following command (sbt-docker) when
> building the image.
>
>
>
> val eclasspath = (*Compile */ *externalDependencyClasspath*).value
> val logbackClassicJar = eclasspath.files.find(file =>
> file.getName.contains("logback-classic"))
> logbackClassicJar.foreach(logback => add(logback, «opt/flink/lib»))
>
>
>
> Given the error message that you provided I think the dependency is
> missing in the lib folder (don’t confuse this with the usrlib folder).
>
>
>
> Kind Regards
>
> Dominik
>
> *From: *Rion Williams 
> *Date: *Thursday, 22 February 2024 at 13:09
> *To: *Bünzli Dominik, INI-DNA-INF 
> *Cc: *user@flink.apache.org 
> *Subject: *Re: Using Custom JSON Formatting with Flink Operator
>
> *Be aware:* This is an external email.
>
>
>
> Hi Dominick,
>
>
>
> In this case the jobs are running using application-mode. All of these
> were previously working as expected for the legacy jobs using the same
> configuration (however those were running via Ververica Platform and
> targeting Flink 1.15.2). I had somewhat expected similar behaviors but it
> seems there’s something that is missing.
>
>
>
> Thanks,
>
>
>
> Rion
>
>
>
> On Feb 22, 2024, at 1:15 AM, dominik.buen...@swisscom.com wrote:
>
> 
>
> Good morning Rion,
>
>
>
> Are you in session job mode or application mode? I’ve had some similar
> issues (logback) lately and it turned out that I also needed to add the
> additional dependencies (I guess JsonTemplateLayout is one of them) to
> the lib folder of the deployment.
>
>
>
> Kind regards
>
> Dominik
>
>
>
> *From: *Rion Williams 
> *Date: *Thursday, 22 February 2024 at 00:46
> *To: *Flink User List 
> *Subject: *Using Custom JSON Formatting with Flink Operator
>
>
> Be aware: This is an external email.
>
>
>
> Hey Flinkers,
>
> Recently I’ve been in the process of migrating a series of older Flink
> jobs to use the official operator and have run into a snag on the logging
> front.
>
> I’ve attempted to use the following configuration for the job:
>
> ```
> logConfiguration:
>   log4j-console.properties: |+
> rootLogger.level = INFO
> rootLogger.appenderRef.console.ref = ConsoleAppender
> rootLogger.appenderRef.rolling.ref = RollingFileAppender
> ...
> appender.console.name = ConsoleAppender
> appender.console.type = Console
> appender.console.layout.type = JsonTemplateLayout
> appender.console.layout.eventTemplateUri = classpath:GcpLayout.json
> ```
>
> However once the job begins running, I’m met with the following errors in
> the logs:
>
> ```
> ERROR Unable to locate plugin type for JsonTemplateLayout
> ERROR Unable to locate plugin for JsonTemplateLayout
> ERROR Could not create plugin of type class
> org.apache.logging.log4j.core.appender.ConsoleAppender for element Console:
> java.lang.NullPointerException java.lang.NullPointerException
> ```
>
> I believe that all of the appropriate references are correct in the actual
> shaded jar itself as I can see things like the JsonTemplateLayout inside of
> it (under org.apache.logging.log4j.template.json.JsonTemplateLayout ) as
> well as the GcpLayout that I’m targeting in the 

[ANNOUNCE] Apache flink-connector-jdbc 3.1.2 released

2024-02-22 Thread Sergey Nuyanzin
The Apache Flink community is very happy to announce the release of Apache
flink-connector-jdbc 3.1.2. This release is compatible with
Apache Flink 1.16, 1.17 and 1.18.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354088

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Release Manager


RE: Flink - OpenSearch connector query

2024-02-22 Thread Praveen Chandna via user
Hello

Request you to please share, how to proceed ? I need to create Index with 
custom settings (no. of shards and replicas etc.)

Thanks !!

From: Praveen Chandna via user 
Sent: Wednesday, February 21, 2024 3:57 PM
To: Praveen Chandna via user 
Subject: Flink - OpenSearch connector query

Hello

As per the OpenSearch connector documentation, OpensearchEmitter can be used to 
perform requests of different types i.e., IndexRequest, DeleteRequest, 
UpdateRequest etc.

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/opensearch/#configuring-the-internal-bulk-processor

I need to implement the CreateIndexRequest (create Index) which is supported by 
OpenSearch client i.e., CreateIndexRequest
How I can implement the Create Index through Flink OpenSearch connector.

Thanks !!

// Regards
Praveen Chandna



Re: Using Custom JSON Formatting with Flink Operator

2024-02-22 Thread Rion Williams
Correct! Building a custom image for the deployment and then copying over the 
jar to a specific directory for the FlinkDeployment to use (as the image 
contains the legacy Flink jobs/jars as well as those newer ones for the 
operator).

> On Feb 22, 2024, at 6:18 AM, dominik.buen...@swisscom.com wrote:
> 
> 
> Hi Rion
>  
> I guess you’re building your own docker image for the deployment right?
>  
> For switching to Logback I’m doing the following command (sbt-docker) when 
> building the image.
>  
> val eclasspath = (Compile / externalDependencyClasspath).value
> val logbackClassicJar = eclasspath.files.find(file => 
> file.getName.contains("logback-classic"))
> logbackClassicJar.foreach(logback => add(logback, «opt/flink/lib»))
>  
> Given the error message that you provided I think the dependency is missing 
> in the lib folder (don’t confuse this with the usrlib folder).
>  
> Kind Regards
> Dominik
> From: Rion Williams 
> Date: Thursday, 22 February 2024 at 13:09
> To: Bünzli Dominik, INI-DNA-INF 
> Cc: user@flink.apache.org 
> Subject: Re: Using Custom JSON Formatting with Flink Operator
> 
> Be aware: This is an external email.
>  
> Hi Dominick,
>  
> In this case the jobs are running using application-mode. All of these were 
> previously working as expected for the legacy jobs using the same 
> configuration (however those were running via Ververica Platform and 
> targeting Flink 1.15.2). I had somewhat expected similar behaviors but it 
> seems there’s something that is missing.
>  
> Thanks,
>  
> Rion
> 
> 
> On Feb 22, 2024, at 1:15 AM, dominik.buen...@swisscom.com wrote:
> 
> 
> Good morning Rion,
>  
> Are you in session job mode or application mode? I’ve had some similar issues 
> (logback) lately and it turned out that I also needed to add the additional 
> dependencies (I guess JsonTemplateLayout is one of them) to the lib folder of 
> the deployment.
>  
> Kind regards
> Dominik
>  
> From: Rion Williams 
> Date: Thursday, 22 February 2024 at 00:46
> To: Flink User List 
> Subject: Using Custom JSON Formatting with Flink Operator
> 
> 
> Be aware: This is an external email.
> 
> 
> 
> Hey Flinkers,
> 
> Recently I’ve been in the process of migrating a series of older Flink jobs 
> to use the official operator and have run into a snag on the logging front.
> 
> I’ve attempted to use the following configuration for the job:
> 
> ```
> logConfiguration:
>   log4j-console.properties: |+
> rootLogger.level = INFO
> rootLogger.appenderRef.console.ref = ConsoleAppender
> rootLogger.appenderRef.rolling.ref = RollingFileAppender
> ...
> appender.console.name = ConsoleAppender
> appender.console.type = Console
> appender.console.layout.type = JsonTemplateLayout
> appender.console.layout.eventTemplateUri = classpath:GcpLayout.json
> ```
> 
> However once the job begins running, I’m met with the following errors in the 
> logs:
> 
> ```
> ERROR Unable to locate plugin type for JsonTemplateLayout
> ERROR Unable to locate plugin for JsonTemplateLayout
> ERROR Could not create plugin of type class 
> org.apache.logging.log4j.core.appender.ConsoleAppender for element Console: 
> java.lang.NullPointerException java.lang.NullPointerException
> ```
> 
> I believe that all of the appropriate references are correct in the actual 
> shaded jar itself as I can see things like the JsonTemplateLayout inside of 
> it (under org.apache.logging.log4j.template.json.JsonTemplateLayout ) as well 
> as the GcpLayout that I’m targeting in the root of the shaded jar as well 
> (including trying several adjustments to shade exceptions, adding a log4j 
> specific shade transformer, etc.)
> 
> I’ve tried adjusting several different knobs/configurations but I’m still 
> continually getting this same error. I’d be happy to share any additional 
> configuration for the job any/or the FlinkDeployment where applicable.
> 
> Just a bit stumped here on something that feels like it should just work.


Re: Using Custom JSON Formatting with Flink Operator

2024-02-22 Thread Dominik.Buenzli
Hi Rion

I guess you’re building your own docker image for the deployment right?

For switching to Logback I’m doing the following command (sbt-docker) when 
building the image.

val eclasspath = (Compile / externalDependencyClasspath).value
val logbackClassicJar = eclasspath.files.find(file => 
file.getName.contains("logback-classic"))
logbackClassicJar.foreach(logback => add(logback, «opt/flink/lib»))

Given the error message that you provided I think the dependency is missing in 
the lib folder (don’t confuse this with the usrlib folder).

Kind Regards
Dominik
From: Rion Williams 
Date: Thursday, 22 February 2024 at 13:09
To: Bünzli Dominik, INI-DNA-INF 
Cc: user@flink.apache.org 
Subject: Re: Using Custom JSON Formatting with Flink Operator
Be aware: This is an external email.

Hi Dominick,

In this case the jobs are running using application-mode. All of these were 
previously working as expected for the legacy jobs using the same configuration 
(however those were running via Ververica Platform and targeting Flink 1.15.2). 
I had somewhat expected similar behaviors but it seems there’s something that 
is missing.

Thanks,

Rion


On Feb 22, 2024, at 1:15 AM, dominik.buen...@swisscom.com wrote:

Good morning Rion,

Are you in session job mode or application mode? I’ve had some similar issues 
(logback) lately and it turned out that I also needed to add the additional 
dependencies (I guess JsonTemplateLayout is one of them) to the lib folder of 
the deployment.

Kind regards
Dominik

From: Rion Williams 
Date: Thursday, 22 February 2024 at 00:46
To: Flink User List 
Subject: Using Custom JSON Formatting with Flink Operator

Be aware: This is an external email.



Hey Flinkers,

Recently I’ve been in the process of migrating a series of older Flink jobs to 
use the official operator and have run into a snag on the logging front.

I’ve attempted to use the following configuration for the job:

```
logConfiguration:
  log4j-console.properties: |+
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
...
appender.console.name = ConsoleAppender
appender.console.type = Console
appender.console.layout.type = JsonTemplateLayout
appender.console.layout.eventTemplateUri = classpath:GcpLayout.json
```

However once the job begins running, I’m met with the following errors in the 
logs:

```
ERROR Unable to locate plugin type for JsonTemplateLayout
ERROR Unable to locate plugin for JsonTemplateLayout
ERROR Could not create plugin of type class 
org.apache.logging.log4j.core.appender.ConsoleAppender for element Console: 
java.lang.NullPointerException java.lang.NullPointerException
```

I believe that all of the appropriate references are correct in the actual 
shaded jar itself as I can see things like the JsonTemplateLayout inside of it 
(under org.apache.logging.log4j.template.json.JsonTemplateLayout ) as well as 
the GcpLayout that I’m targeting in the root of the shaded jar as well 
(including trying several adjustments to shade exceptions, adding a log4j 
specific shade transformer, etc.)

I’ve tried adjusting several different knobs/configurations but I’m still 
continually getting this same error. I’d be happy to share any additional 
configuration for the job any/or the FlinkDeployment where applicable.

Just a bit stumped here on something that feels like it should just work.


Re: Using Custom JSON Formatting with Flink Operator

2024-02-22 Thread Rion Williams
Hi Dominick,

In this case the jobs are running using application-mode. All of these were 
previously working as expected for the legacy jobs using the same configuration 
(however those were running via Ververica Platform and targeting Flink 1.15.2). 
I had somewhat expected similar behaviors but it seems there’s something that 
is missing.

Thanks,

Rion

> On Feb 22, 2024, at 1:15 AM, dominik.buen...@swisscom.com wrote:
> 
> 
> Good morning Rion,
>  
> Are you in session job mode or application mode? I’ve had some similar issues 
> (logback) lately and it turned out that I also needed to add the additional 
> dependencies (I guess JsonTemplateLayout is one of them) to the lib folder of 
> the deployment.
>  
> Kind regards
> Dominik
>  
> From: Rion Williams 
> Date: Thursday, 22 February 2024 at 00:46
> To: Flink User List 
> Subject: Using Custom JSON Formatting with Flink Operator
> 
> 
> Be aware: This is an external email.
> 
> 
> 
> Hey Flinkers,
> 
> Recently I’ve been in the process of migrating a series of older Flink jobs 
> to use the official operator and have run into a snag on the logging front.
> 
> I’ve attempted to use the following configuration for the job:
> 
> ```
> logConfiguration:
>   log4j-console.properties: |+
> rootLogger.level = INFO
> rootLogger.appenderRef.console.ref = ConsoleAppender
> rootLogger.appenderRef.rolling.ref = RollingFileAppender
> ...
> appender.console.name = ConsoleAppender
> appender.console.type = Console
> appender.console.layout.type = JsonTemplateLayout
> appender.console.layout.eventTemplateUri = classpath:GcpLayout.json
> ```
> 
> However once the job begins running, I’m met with the following errors in the 
> logs:
> 
> ```
> ERROR Unable to locate plugin type for JsonTemplateLayout
> ERROR Unable to locate plugin for JsonTemplateLayout
> ERROR Could not create plugin of type class 
> org.apache.logging.log4j.core.appender.ConsoleAppender for element Console: 
> java.lang.NullPointerException java.lang.NullPointerException
> ```
> 
> I believe that all of the appropriate references are correct in the actual 
> shaded jar itself as I can see things like the JsonTemplateLayout inside of 
> it (under org.apache.logging.log4j.template.json.JsonTemplateLayout ) as well 
> as the GcpLayout that I’m targeting in the root of the shaded jar as well 
> (including trying several adjustments to shade exceptions, adding a log4j 
> specific shade transformer, etc.)
> 
> I’ve tried adjusting several different knobs/configurations but I’m still 
> continually getting this same error. I’d be happy to share any additional 
> configuration for the job any/or the FlinkDeployment where applicable.
> 
> Just a bit stumped here on something that feels like it should just work.


Re: Temporal join on rolling aggregate

2024-02-22 Thread Gyula Fóra
Posting this to dev as well as it potentially has some implications on
development effort.

What seems to be the problem here is that we cannot control/override
Timestamps/Watermarks/Primary key on VIEWs. It's understandable that you
cannot create a PRIMARY KEY on the view but I think the temporal join also
should not require the PK, should we remove this limitation?

The general problem is the inflexibility of the timestamp/watermark
handling on query outputs, which makes this again impossible.

The workaround here can be to write the rolling aggregate to Kafka, read it
back again and join with that. The fact that this workaround is possible
actually highlights the need for more flexibility on the query/view side in
my opinion.

Has anyone else run into this issue and considered the proper solution to
the problem? Feels like it must be pretty common :)

Cheers,
Gyula




On Wed, Feb 21, 2024 at 10:29 PM Sébastien Chevalley 
wrote:

> Hi,
>
> I have been trying to write a temporal join in SQL done on a rolling
> aggregate view. However it does not work and throws :
>
> org.apache.flink.table.api.ValidationException: Event-Time Temporal Table
> Join requires both primary key and row time attribute in versioned table,
> but no row time attribute can be found.
>
> It seems that after the aggregation, the table looses the watermark and
> it's not possible to add one with the SQL API as it's a view.
>
> CREATE TABLE orders (
> order_id INT,
> price DECIMAL(6, 2),
> currency_id INT,
> order_time AS NOW(),
> WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND
> )
> WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '10',
> 'fields.order_id.kind' = 'sequence',
> 'fields.order_id.start' = '1',
> 'fields.order_id.end' = '10',
> 'fields.currency_id.min' = '1',
> 'fields.currency_id.max' = '20'
> );
>
> CREATE TABLE currency_rates (
> currency_id INT,
> conversion_rate DECIMAL(4, 3),
> PRIMARY KEY (currency_id) NOT ENFORCED
> )
> WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '10',
> 'fields.currency_id.min' = '1',
> 'fields.currency_id.max' = '20'
> );
>
> CREATE TEMPORARY VIEW max_rates AS (
> SELECT
> currency_id,
> MAX(conversion_rate) AS max_rate
> FROM currency_rates
> GROUP BY currency_id
> );
>
> CREATE TEMPORARY VIEW temporal_join AS (
> SELECT
> order_id,
> max_rates.max_rate
> FROM orders
>  LEFT JOIN max_rates FOR SYSTEM_TIME AS OF orders.order_time
>  ON orders.currency_id = max_rates.currency_id
> );
>
> SELECT * FROM temporal_join;
>
> Am I missing something? What would be a good starting point to address
> this?
>
> Thanks in advance,
> Sébastien Chevalley


Re: 退订

2024-02-22 Thread Leonard Xu
可以发送任意内容的邮件到  user-zh-unsubscr...@flink.apache.org   取消订阅来自 
user-zh@flink.apache.org  邮件列表的邮件,邮件列表的订阅管理,可以参考[1]

祝好,
[1] https://flink.apache.org/zh/what-is-flink/community/

> 2024年2月20日 下午4:36,任香帅  写道:
> 
> 退订



Re:Evenly distributing events with same key

2024-02-22 Thread Xuyang
Hi, Dominik.
For data skew, I think you can refer to the tuning and optimization ideas in 
Flink SQL [1] and implement it manually through the DataStream API. If it is 
simple processing logic and aggregation operations, you can even use the Flink 
SQL API directly. Especially the way you manually add polling numbers now is 
actually the split distinct automatic optimization process [2].


> I see that some taskmanagers handle substantially more load than others (few 
> million records difference)


IIUC, this could be because the combined operator you used subsequently always 
shuffles these data to the same task manager. You can examine the processing 
throughput for each vertex on Flink UI by observing the 'records received' 
metric, to check if there are any other nodes causing data skew except this 
aggregation node.


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#performance-tuning
[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#split-distinct-aggregation

--

Best!
Xuyang




At 2024-02-22 15:44:26, ""  wrote:

Hi all,

 

I am currently facing the problem of having a pipeline (DataStream API) where I 
need to split a GenericRecord into its fields and then aggregate all the values 
of a particular field into 30 minute windows. Therefore, if I were to use only 
a keyBy field name, I would send all the values of a field to the same parallel 
instance of the cluster. This is bad as the stream is quite large (15k 
events/second). What I want to achieve is a more even distribution of events 
across the different taskmanagers.

 

Currently, I assign a "rolling" number (between 0 and maximum parallelism) to 
the field name as a secondary key component and use this combination as keyBy. 
This leads to "partitioned" events, which I have to recombine in a second step 
by using only the field name of the composite key.

 

 

I tested this approach and it works but when looking at the Flink WebUI, I see 
that some taskmanagers handle substantially more load than others (few million 
records difference). I also had a look at the partitionCustom() but this 
doesn’t work for KeyedStreams right? Did someone else face a related issue? Any 
suggestions how I can distribute events with the same key more evenly?

 

Kind Regards

Dominik

 

 

Evenly distributing events with same key

2024-02-21 Thread Dominik.Buenzli
Hi all,

I am currently facing the problem of having a pipeline (DataStream API) where I 
need to split a GenericRecord into its fields and then aggregate all the values 
of a particular field into 30 minute windows. Therefore, if I were to use only 
a keyBy field name, I would send all the values of a field to the same parallel 
instance of the cluster. This is bad as the stream is quite large (15k 
events/second). What I want to achieve is a more even distribution of events 
across the different taskmanagers.

Currently, I assign a "rolling" number (between 0 and maximum parallelism) to 
the field name as a secondary key component and use this combination as keyBy. 
This leads to "partitioned" events, which I have to recombine in a second step 
by using only the field name of the composite key.

[cid:image001.png@01DA656B.0703DA50]

I tested this approach and it works but when looking at the Flink WebUI, I see 
that some taskmanagers handle substantially more load than others (few million 
records difference). I also had a look at the partitionCustom() but this 
doesn’t work for KeyedStreams right? Did someone else face a related issue? Any 
suggestions how I can distribute events with the same key more evenly?

Kind Regards
Dominik




Re:Re:Re:Re: flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-21 Thread Xuyang
Hi, 
> 那是不是要计算sink延迟的话只需用 context.currentProcessingTime() - context.timestamp() 即可?
对,具体可以参考下这个内部实现的算子[1]


> 新的sink 
> v2中的SinkWriter.Context已经没有currentProcessingTime()方法了,是不是可以直接使用System.currentTimeMillis()
>  - context.timestamp()得到sink延迟呢?
应该是可以的,就是可能因为各tm的机器时间会有略微差异的情况,不会特别准,但是应该也够用了。


[1] 
https://github.com/apache/flink/blob/e7e973e212d0ca04855af3036fc5b73888b8e0e5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java#L314




--

Best!
Xuyang





在 2024-02-21 15:17:49,"casel.chen"  写道:
>感谢!那是不是要计算sink延迟的话只需用 context.currentProcessingTime() - context.timestamp() 即可?
>我看新的sink 
>v2中的SinkWriter.Context已经没有currentProcessingTime()方法了,是不是可以直接使用System.currentTimeMillis()
> - context.timestamp()得到sink延迟呢?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2024-02-21 09:41:37,"Xuyang"  写道:
>>Hi, chen. 
>>可以试一下在sink function的invoke函数中使用:
>>
>>
>>@Override
>>public void invoke(RowData row, Context context) throws Exception {
>>context.currentProcessingTime(); 
>>context.currentWatermark(); 
>>...
>>}
>>
>>
>>
>>
>>
>>
>>
>>--
>>
>>Best!
>>Xuyang
>>
>>
>>
>>
>>
>>在 2024-02-20 19:38:44,"Feng Jin"  写道:
>>>我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime.
>>>
>>>Best,
>>>Feng
>>>
>>>On Tue, Feb 20, 2024 at 4:35 PM casel.chen  wrote:
>>>
 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?


 public class XxxSinkFunction extends RichSinkFunction implements
 CheckpointedFunction, CheckpointListener {


 @Override
 public synchronized void invoke(RowData rowData, Context context)
 throws IOException {
//  这里想从rowData中获取event time和watermark值,如何实现呢?
 }
 }


 例如source table如下定义


 CREATE TEMPORARY TABLE source_table(
   username varchar,
   click_url varchar,
   eventtime varchar,

   ts AS TO_TIMESTAMP(eventtime),
   WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
 ) with (
   'connector'='kafka',
   ...

 );


 CREATE TEMPORARY TABLE sink_table(
   username varchar,
   click_url varchar,
   eventtime varchar
 ) with (
   'connector'='xxx',
   ...
 );
 insert into sink_table select username,click_url,eventtime from
 source_table;


Re: Using Custom JSON Formatting with Flink Operator

2024-02-21 Thread Dominik.Buenzli
Good morning Rion,

Are you in session job mode or application mode? I’ve had some similar issues 
(logback) lately and it turned out that I also needed to add the additional 
dependencies (I guess JsonTemplateLayout is one of them) to the lib folder of 
the deployment.

Kind regards
Dominik

From: Rion Williams 
Date: Thursday, 22 February 2024 at 00:46
To: Flink User List 
Subject: Using Custom JSON Formatting with Flink Operator

Be aware: This is an external email.



Hey Flinkers,

Recently I’ve been in the process of migrating a series of older Flink jobs to 
use the official operator and have run into a snag on the logging front.

I’ve attempted to use the following configuration for the job:

```
logConfiguration:
  log4j-console.properties: |+
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
...
appender.console.name = ConsoleAppender
appender.console.type = Console
appender.console.layout.type = JsonTemplateLayout
appender.console.layout.eventTemplateUri = classpath:GcpLayout.json
```

However once the job begins running, I’m met with the following errors in the 
logs:

```
ERROR Unable to locate plugin type for JsonTemplateLayout
ERROR Unable to locate plugin for JsonTemplateLayout
ERROR Could not create plugin of type class 
org.apache.logging.log4j.core.appender.ConsoleAppender for element Console: 
java.lang.NullPointerException java.lang.NullPointerException
```

I believe that all of the appropriate references are correct in the actual 
shaded jar itself as I can see things like the JsonTemplateLayout inside of it 
(under org.apache.logging.log4j.template.json.JsonTemplateLayout ) as well as 
the GcpLayout that I’m targeting in the root of the shaded jar as well 
(including trying several adjustments to shade exceptions, adding a log4j 
specific shade transformer, etc.)

I’ve tried adjusting several different knobs/configurations but I’m still 
continually getting this same error. I’d be happy to share any additional 
configuration for the job any/or the FlinkDeployment where applicable.

Just a bit stumped here on something that feels like it should just work.


Re: 退订

2024-02-21 Thread Zhanghao Chen
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 来取消订阅来自 
user-zh@flink.apache.org
邮件组的邮件。

Best,
Zhanghao Chen

From: 曹明勤 
Sent: Thursday, February 22, 2024 9:42
To: user-zh@flink.apache.org 
Subject: 退订

退订


退订

2024-02-21 Thread 曹明勤
退订

Using Custom JSON Formatting with Flink Operator

2024-02-21 Thread Rion Williams
Hey Flinkers,

Recently I’ve been in the process of migrating a series of older Flink jobs to 
use the official operator and have run into a snag on the logging front.

I’ve attempted to use the following configuration for the job:

```
logConfiguration:
  log4j-console.properties: |+
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
...
appender.console.name = ConsoleAppender
appender.console.type = Console
appender.console.layout.type = JsonTemplateLayout
appender.console.layout.eventTemplateUri = classpath:GcpLayout.json
```

However once the job begins running, I’m met with the following errors in the 
logs:

```
ERROR Unable to locate plugin type for JsonTemplateLayout
ERROR Unable to locate plugin for JsonTemplateLayout
ERROR Could not create plugin of type class 
org.apache.logging.log4j.core.appender.ConsoleAppender for element Console: 
java.lang.NullPointerException java.lang.NullPointerException
```

I believe that all of the appropriate references are correct in the actual 
shaded jar itself as I can see things like the JsonTemplateLayout inside of it 
(under org.apache.logging.log4j.template.json.JsonTemplateLayout ) as well as 
the GcpLayout that I’m targeting in the root of the shaded jar as well 
(including trying several adjustments to shade exceptions, adding a log4j 
specific shade transformer, etc.)

I’ve tried adjusting several different knobs/configurations but I’m still 
continually getting this same error. I’d be happy to share any additional 
configuration for the job any/or the FlinkDeployment where applicable. 

Just a bit stumped here on something that feels like it should just work.

Temporal join on rolling aggregate

2024-02-21 Thread Sébastien Chevalley
Hi,

I have been trying to write a temporal join in SQL done on a rolling aggregate 
view. However it does not work and throws :

org.apache.flink.table.api.ValidationException: Event-Time Temporal Table Join 
requires both primary key and row time attribute in versioned table, but no row 
time attribute can be found.

It seems that after the aggregation, the table looses the watermark and it's 
not possible to add one with the SQL API as it's a view.

CREATE TABLE orders (
order_id INT,
price DECIMAL(6, 2),
currency_id INT,
order_time AS NOW(),
WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.order_id.kind' = 'sequence',
'fields.order_id.start' = '1',
'fields.order_id.end' = '10',
'fields.currency_id.min' = '1',
'fields.currency_id.max' = '20'
);

CREATE TABLE currency_rates (
currency_id INT,
conversion_rate DECIMAL(4, 3),
PRIMARY KEY (currency_id) NOT ENFORCED
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.currency_id.min' = '1',
'fields.currency_id.max' = '20'
);

CREATE TEMPORARY VIEW max_rates AS (
SELECT
currency_id,
MAX(conversion_rate) AS max_rate
FROM currency_rates
GROUP BY currency_id
);

CREATE TEMPORARY VIEW temporal_join AS (
SELECT
order_id,
max_rates.max_rate
FROM orders
 LEFT JOIN max_rates FOR SYSTEM_TIME AS OF orders.order_time
 ON orders.currency_id = max_rates.currency_id
);

SELECT * FROM temporal_join;

Am I missing something? What would be a good starting point to address this?

Thanks in advance,
Sébastien Chevalley

Flink Scala Positions in India or USA !

2024-02-21 Thread sri hari kali charan Tummala
Hi Folks,

I am currently seeking full-time positions in Flink Scala in India or the
USA (non consulting) , specifically at the Principal or Staff level
positions in India or USA.

I require an h1b transfer and assistance with relocation from India , my
i40 is approved.

Thanks & Regards
Sri Tummala


Flink - OpenSearch connector query

2024-02-21 Thread Praveen Chandna via user
Hello

As per the OpenSearch connector documentation, OpensearchEmitter can be used to 
perform requests of different types i.e., IndexRequest, DeleteRequest, 
UpdateRequest etc.

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/opensearch/#configuring-the-internal-bulk-processor

I need to implement the CreateIndexRequest (create Index) which is supported by 
OpenSearch client i.e., CreateIndexRequest
How I can implement the Create Index through Flink OpenSearch connector.

Thanks !!

// Regards
Praveen Chandna



Re: Preparing keyed state before snapshot

2024-02-21 Thread Lorenzo Nicora
Thanks Thias and Zakelly,

I probably muddied the waters saying that my use case was similar to
kvCache.
What I was calling "non serializable state" is actually a Random Cut Forest
ML model that cannot be serialized by itself, but you can extract a
serializable state. That is serializable, but definitely not a
primitive type.
To be specific, I am trying to implement a keyed version of this RCF
operator [1]. I need one RCF model (a separate "forest") per key. Key
cardinality is not very high, and the size of the state should not be
a problem.

I guess the only feasible way is what Zakelly is suggesting, using
reflection to extract and set the keyContext from within processElement().
I will explore this option.

Thanks again
Lorenzo

[1]
https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/AnomalyDetection/RandomCutForest/src/main/java/software/amazon/flink/example/operator/RandomCutForestOperator.java


On Wed, 21 Feb 2024 at 08:13, Schwalbe Matthias 
wrote:

> Good morning all,
>
>
>
> Let me loop myself in …
>
>
>
>1. Another even more convenient way to enable cache is to actually
>configure/assign RocksDB to use more off-heap memory for cache, you also
>might consider enabling bloom filters  (all depends on how large you
>key-space is (thousands/millions/billions/…)
>
> Within the technological limits, RocksDB is hard to top, if keeping all
> data in memory is no option, this is the path I usually follow.
>
>1. The other question on how to control the current-key from within
>snapshot state: you can acquire a pointer to the underlying state backend
>e.g. from within open() and the get hold of a pointer of the specific state
>primitive, and set the current key directly.
>
> In order to find out how to do that, put a breakpoint in debugger and walk
> up a couple of call stack frames, and/or walk into the value setters and
> model after how it is done there.
>
> Mind though, to restore the current key, if you happen to change it to
> another key.
>
> Doing this e.g. in initializeState() is time-insensitive, because this
> happens outside the ‘hot’ code paths.
>
>1. If the number of elements to store is small, you can store it in
>operator state and initialize your local structure in initializeState()
>from it, you probably would want to keep the data in serialized form in
>operator state, since you mentioned, serialization would be expensive.
>2. There is another API (which I don’t remember the name of) that
>allows you to store operator state as BLOB directly if that would be a
>doable option for you.
>
>
>
> Sincere greetings
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
> *From:* Zakelly Lan 
> *Sent:* Wednesday, February 21, 2024 8:04 AM
> *To:* Lorenzo Nicora 
> *Cc:* Flink User Group 
> *Subject:* Re: Preparing keyed state before snapshot
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi Lorenzo,
>
>
>
> I think the most convenient way is to modify the code of the state
> backend, adding a k-v cache as you want.
>
>
>
> Otherwise IIUC, there's no public interface to get keyContext. But well,
> you may try something hacky. You may use the passed-in `Context` instance
> in processElement, and leverage java reflection to get
> the KeyedProcessOperator instance, where you can perform setCurrentKey().
>
>
>
>
>
> Best,
>
> Zakelly
>
>
>
> On Wed, Feb 21, 2024 at 1:05 AM Lorenzo Nicora 
> wrote:
>
> Thanks Zakelly,
>
>
>
> I'd need to do something similar, with a map containing my
> non-serializable "state", similar to the kvCache in FastTop1Fucntion.
>
>
>
> But I am not sure I understand how I can set the keyed state for a
> specific key, in snapshotState().
>
> FastTop1Function seems to rely on keyContext set via setKeyContext(). This
> method is not part of the API. I see it's set specifically for
> AbstractTopNFuction in StreamExecRank.
>
> How can I do something similar without modifying the Flink runtime?
>
>
>
> Lorenzo
>
>
>
>
>
> On Sun, 18 Feb 2024 at 03:42, Zakelly Lan  wrote:
>
> Hi Lorenzo,
>
>
>
> It is not recommended to do this with the keyed state. However there is an
> example in flink code (FastTop1Function#snapshotState) [1] of setting keys
> when snapshotState().
>
>
>
> Hope this helps.
>
>
>
> [1]
> https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165
>
>
>
> Best,
>
> Zakelly
>
>
>
> On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora 
> wrote:
>
> Hi Thias
>
> I considered CheckpointedFunction.
> In snapshotState() I would have to update the state of each key,
> extracting the in-memory "state" of each key and putting it in the state
> with state.update(...) .
> This must happen per key,
>
> But snapshotState() has no visibility of the keys. And I have no way of
> selectively accessing the state of a specific key to update it.
> Unless I am 

RE: Preparing keyed state before snapshot

2024-02-21 Thread Schwalbe Matthias
Good morning all,

Let me loop myself in …


  1.  Another even more convenient way to enable cache is to actually 
configure/assign RocksDB to use more off-heap memory for cache, you also might 
consider enabling bloom filters  (all depends on how large you key-space is 
(thousands/millions/billions/…)
Within the technological limits, RocksDB is hard to top, if keeping all data in 
memory is no option, this is the path I usually follow.

  1.  The other question on how to control the current-key from within snapshot 
state: you can acquire a pointer to the underlying state backend e.g. from 
within open() and the get hold of a pointer of the specific state primitive, 
and set the current key directly.
In order to find out how to do that, put a breakpoint in debugger and walk up a 
couple of call stack frames, and/or walk into the value setters and model after 
how it is done there.
Mind though, to restore the current key, if you happen to change it to another 
key.
Doing this e.g. in initializeState() is time-insensitive, because this happens 
outside the ‘hot’ code paths.

  1.  If the number of elements to store is small, you can store it in operator 
state and initialize your local structure in initializeState() from it, you 
probably would want to keep the data in serialized form in operator state, 
since you mentioned, serialization would be expensive.
  2.  There is another API (which I don’t remember the name of) that allows you 
to store operator state as BLOB directly if that would be a doable option for 
you.

Sincere greetings

Thias




From: Zakelly Lan 
Sent: Wednesday, February 21, 2024 8:04 AM
To: Lorenzo Nicora 
Cc: Flink User Group 
Subject: Re: Preparing keyed state before snapshot

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi Lorenzo,

I think the most convenient way is to modify the code of the state backend, 
adding a k-v cache as you want.

Otherwise IIUC, there's no public interface to get keyContext. But well, you 
may try something hacky. You may use the passed-in `Context` instance in 
processElement, and leverage java reflection to get the KeyedProcessOperator 
instance, where you can perform setCurrentKey().


Best,
Zakelly

On Wed, Feb 21, 2024 at 1:05 AM Lorenzo Nicora 
mailto:lorenzo.nic...@gmail.com>> wrote:
Thanks Zakelly,

I'd need to do something similar, with a map containing my non-serializable 
"state", similar to the kvCache in FastTop1Fucntion.

But I am not sure I understand how I can set the keyed state for a specific 
key, in snapshotState().
FastTop1Function seems to rely on keyContext set via setKeyContext(). This 
method is not part of the API. I see it's set specifically for 
AbstractTopNFuction in StreamExecRank.
How can I do something similar without modifying the Flink runtime?

Lorenzo


On Sun, 18 Feb 2024 at 03:42, Zakelly Lan 
mailto:zakelly@gmail.com>> wrote:
Hi Lorenzo,

It is not recommended to do this with the keyed state. However there is an 
example in flink code (FastTop1Function#snapshotState) [1] of setting keys when 
snapshotState().

Hope this helps.

[1] 
https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165

Best,
Zakelly

On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora 
mailto:lorenzo.nic...@gmail.com>> wrote:
Hi Thias

I considered CheckpointedFunction.
In snapshotState() I would have to update the state of each key, extracting the 
in-memory "state" of each key and putting it in the state with 
state.update(...) .
This must happen per key,
But snapshotState() has no visibility of the keys. And I have no way of 
selectively accessing the state of a specific key to update it.
Unless I am missing something

Thanks
Lorenzo


On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias 
mailto:matthias.schwa...@viseca.ch>> wrote:
Good morning Lorenzo,

You may want to implement 
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in 
your KeyedProcessFunction.
Btw. By the time initializeState(…) is called, the state backend is fully 
initialized and can be read and written to (which is not the case for when the 
open(…) function is called.
In initializeState(…) you also get access to state of different operator key.
SnapshotState(…) is called as part of the (each) checkpoint in order to store 
data.

Sincere greetings

Thias

From: Lorenzo Nicora mailto:lorenzo.nic...@gmail.com>>
Sent: Thursday, February 15, 2024 7:50 PM
To: Flink User Group mailto:user@flink.apache.org>>
Subject: Preparing keyed state before snapshot

Hello everyone,

I have a convoluted problem.

I am implementing a KeyedProcessFunction that keeps some non-serializable 
"state" in memory, in a transient Map (key = stream key, value = the 
non-serializable "state").

I can extract a serializable representation to put in Flink state, and I can 
load my in-memory "state" from the Flink state. But these 

flink sql作业如何统计端到端延迟

2024-02-20 Thread casel.chen
flink sql作业从kafka消费mysql过来的canal 
json消息,经过复杂处理后写入doris,请问如何统计doris表记录的端到端时延?mysql表有update_time字段代表业务更新记录时间。
doris系统可以在表schema新增一个更新时间列ingest_time,所以在doris表上可以通过ingest_time - 
update_time算出端到端时延,但这种方法只能离线统计,有没有实时统计以方便实时监控的方法?
查了SinkFunction类的invoke方法虽然带有Context类型参数可以获取当前处理时间和事件时间,但因为大部分sink都是采用攒微批方式再批量写入的,所以这两个时间直接相减得到的时间差并不能代表真实落库的时延。有没有精确获取时延的方法呢?

Re:Re:Re: flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-20 Thread casel.chen
感谢!那是不是要计算sink延迟的话只需用 context.currentProcessingTime() - context.timestamp() 即可?
我看新的sink 
v2中的SinkWriter.Context已经没有currentProcessingTime()方法了,是不是可以直接使用System.currentTimeMillis()
 - context.timestamp()得到sink延迟呢?














在 2024-02-21 09:41:37,"Xuyang"  写道:
>Hi, chen. 
>可以试一下在sink function的invoke函数中使用:
>
>
>@Override
>public void invoke(RowData row, Context context) throws Exception {
>context.currentProcessingTime(); 
>context.currentWatermark(); 
>...
>}
>
>
>
>
>
>
>
>--
>
>Best!
>Xuyang
>
>
>
>
>
>在 2024-02-20 19:38:44,"Feng Jin"  写道:
>>我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime.
>>
>>Best,
>>Feng
>>
>>On Tue, Feb 20, 2024 at 4:35 PM casel.chen  wrote:
>>
>>> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
>>>
>>>
>>> public class XxxSinkFunction extends RichSinkFunction implements
>>> CheckpointedFunction, CheckpointListener {
>>>
>>>
>>> @Override
>>> public synchronized void invoke(RowData rowData, Context context)
>>> throws IOException {
>>>//  这里想从rowData中获取event time和watermark值,如何实现呢?
>>> }
>>> }
>>>
>>>
>>> 例如source table如下定义
>>>
>>>
>>> CREATE TEMPORARY TABLE source_table(
>>>   username varchar,
>>>   click_url varchar,
>>>   eventtime varchar,
>>>
>>>   ts AS TO_TIMESTAMP(eventtime),
>>>   WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
>>> ) with (
>>>   'connector'='kafka',
>>>   ...
>>>
>>> );
>>>
>>>
>>> CREATE TEMPORARY TABLE sink_table(
>>>   username varchar,
>>>   click_url varchar,
>>>   eventtime varchar
>>> ) with (
>>>   'connector'='xxx',
>>>   ...
>>> );
>>> insert into sink_table select username,click_url,eventtime from
>>> source_table;


Re: Preparing keyed state before snapshot

2024-02-20 Thread Zakelly Lan
Hi Lorenzo,

I think the most convenient way is to modify the code of the state backend,
adding a k-v cache as you want.

Otherwise IIUC, there's no public interface to get keyContext. But well,
you may try something hacky. You may use the passed-in `Context` instance
in processElement, and leverage java reflection to get
the KeyedProcessOperator instance, where you can perform setCurrentKey().


Best,
Zakelly

On Wed, Feb 21, 2024 at 1:05 AM Lorenzo Nicora 
wrote:

> Thanks Zakelly,
>
> I'd need to do something similar, with a map containing my
> non-serializable "state", similar to the kvCache in FastTop1Fucntion.
>
> But I am not sure I understand how I can set the keyed state for a
> specific key, in snapshotState().
> FastTop1Function seems to rely on keyContext set via setKeyContext(). This
> method is not part of the API. I see it's set specifically for
> AbstractTopNFuction in StreamExecRank.
> How can I do something similar without modifying the Flink runtime?
>
> Lorenzo
>
>
> On Sun, 18 Feb 2024 at 03:42, Zakelly Lan  wrote:
>
>> Hi Lorenzo,
>>
>> It is not recommended to do this with the keyed state. However there is
>> an example in flink code (FastTop1Function#snapshotState) [1] of setting
>> keys when snapshotState().
>>
>> Hope this helps.
>>
>> [1]
>> https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165
>>
>> Best,
>> Zakelly
>>
>> On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora 
>> wrote:
>>
>>> Hi Thias
>>>
>>> I considered CheckpointedFunction.
>>> In snapshotState() I would have to update the state of each key,
>>> extracting the in-memory "state" of each key and putting it in the state
>>> with state.update(...) .
>>> This must happen per key,
>>> But snapshotState() has no visibility of the keys. And I have no way of
>>> selectively accessing the state of a specific key to update it.
>>> Unless I am missing something
>>>
>>> Thanks
>>> Lorenzo
>>>
>>>
>>> On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias <
>>> matthias.schwa...@viseca.ch> wrote:
>>>
 Good morning Lorenzo,



 You may want to implement
 org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in
 your KeyedProcessFunction.

 Btw. By the time initializeState(…) is called, the state backend is
 fully initialized and can be read and written to (which is not the case for
 when the open(…) function is called.

 In initializeState(…) you also get access to state of different
 operator key.

 SnapshotState(…) is called as part of the (each) checkpoint in order to
 store data.



 Sincere greetings



 Thias



 *From:* Lorenzo Nicora 
 *Sent:* Thursday, February 15, 2024 7:50 PM
 *To:* Flink User Group 
 *Subject:* Preparing keyed state before snapshot



 Hello everyone,



 I have a convoluted problem.



 I am implementing a KeyedProcessFunction that keeps some
 non-serializable "state" in memory, in a transient Map (key = stream key,
 value = the non-serializable "state").



 I can extract a serializable representation to put in Flink state, and
 I can load my in-memory "state" from the Flink state. But these operations
 are expensive.



 Initializing the in-memory "state" is relatively easy. I do it lazily,
 in processElement(), on the first record for the key.



 The problem is saving the in-memory "state" to Flink state.

 I need to do it only before the state snapshot. But
 KeyedProcessFunction has no entrypoint called before the state snapshot.

 I cannot use CheckpointedFunction.snapshotState(), because it does not
 work for keyed state.



 Any suggestions?



 Note that I cannot use operator state nor a broadcast state.

 Processing is keyed. Every processed record modifies the in-memory
 "state" of that key. If the job rescale, the state of the key must follow
 the partition.





 Regards

 Lorenzo
 Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
 beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
 Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
 übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
 Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
 Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
 eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
 dieser Informationen ist streng verboten.

 This message is intended only for the named recipient and may contain
 confidential or privileged information. As the confidentiality of 

[ANNOUNCE] Apache Kyuubi 1.8.1 is available

2024-02-20 Thread Cheng Pan
Hi all,

The Apache Kyuubi community is pleased to announce that
Apache Kyuubi 1.8.1 has been released!

Apache Kyuubi is a distributed and multi-tenant gateway to provide
serverless SQL on data warehouses and lakehouses.

Kyuubi provides a pure SQL gateway through Thrift JDBC/ODBC and
RESTful interfaces for end-users to manipulate large-scale data with
pre-programmed and extensible Spark/Flink/Trino/Hive engines.

We are aiming to make Kyuubi an "out-of-the-box" tool for data warehouses
and lakehouses.

This "out-of-the-box" model minimizes the barriers and costs for end-users
to use Spark/Flink/Trino/Hive engines on the client side.

At the server-side, Kyuubi server and engine's multi-tenant architecture
provides the administrators a way to achieve computing resource isolation,
data security, high availability, high client concurrency, etc.

The full release notes and download links are available at:
Release Notes: https://kyuubi.apache.org/release/1.8.1.html

To learn more about Apache Kyuubi, please see
https://kyuubi.apache.org/

Kyuubi Resources:
- Issue: https://github.com/apache/kyuubi/issues
- Mailing list: d...@kyuubi.apache.org

We would like to thank all contributors of the Kyuubi community
who made this release possible!

Thanks,
Cheng Pan, on behalf of Apache Kyuubi community


Entering a busy loop when adding a new sink to the graph + checkpoint enabled

2024-02-20 Thread Daniel Peled
Hello Guys,

Can someone please assist us regarding the following issue ?

We have noticed that when we add a *new kafka sink* operator to the graph, *and
start from the last save point*, the operator is 100% busy for several
minutes and *even 1/2-1 hour* !!!

The problematic code seems to be the following for-loop in
getTransactionalProducer() method:

*org.apache.flink.connector.kafka.sink.KafkaWriter#getTransactionalProducer*

private FlinkKafkaInternalProducer
getTransactionalProducer(long checkpointId) {
checkState(
checkpointId > lastCheckpointId,
"Expected %s > %s",
checkpointId,
lastCheckpointId);
FlinkKafkaInternalProducer producer = null;
// in case checkpoints have been aborted, Flink would create
non-consecutive transaction ids
// this loop ensures that all gaps are filled with initialized
(empty) transactions





* for (long id = lastCheckpointId + 1; id <= checkpointId; id++) {
  String transactionalId =
TransactionalIdFactory.buildTransactionalId(
transactionalIdPrefix, kafkaSinkContext.getParallelInstanceId(), id);
  producer = getOrCreateTransactionalProducer(transactionalId);
}*
this.lastCheckpointId = checkpointId;
assert producer != null;
LOG.info("Created new transactional producer {}",
producer.getTransactionalId());
return producer;
}


Since we added a new sink operator the lastCheckpointId is 1,
And if for example the checkpointId is 2,
The loop will be executed for 2 times !!!


We have several questions:
1. Is this behaviour expected ?
2. Are we doing something wrong ?
3. Is there a way to avoid this behavior ?

Best Regards,
Danny


Re:Re: flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-20 Thread Xuyang
Hi, chen. 
可以试一下在sink function的invoke函数中使用:


@Override
public void invoke(RowData row, Context context) throws Exception {
context.currentProcessingTime(); 
context.currentWatermark(); 
...
}







--

Best!
Xuyang





在 2024-02-20 19:38:44,"Feng Jin"  写道:
>我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime.
>
>Best,
>Feng
>
>On Tue, Feb 20, 2024 at 4:35 PM casel.chen  wrote:
>
>> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
>>
>>
>> public class XxxSinkFunction extends RichSinkFunction implements
>> CheckpointedFunction, CheckpointListener {
>>
>>
>> @Override
>> public synchronized void invoke(RowData rowData, Context context)
>> throws IOException {
>>//  这里想从rowData中获取event time和watermark值,如何实现呢?
>> }
>> }
>>
>>
>> 例如source table如下定义
>>
>>
>> CREATE TEMPORARY TABLE source_table(
>>   username varchar,
>>   click_url varchar,
>>   eventtime varchar,
>>
>>   ts AS TO_TIMESTAMP(eventtime),
>>   WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
>> ) with (
>>   'connector'='kafka',
>>   ...
>>
>> );
>>
>>
>> CREATE TEMPORARY TABLE sink_table(
>>   username varchar,
>>   click_url varchar,
>>   eventtime varchar
>> ) with (
>>   'connector'='xxx',
>>   ...
>> );
>> insert into sink_table select username,click_url,eventtime from
>> source_table;


Re: Support for ConfigMap for Runtime Arguments in Flink Kubernetes Operator

2024-02-20 Thread Surendra Singh Lilhore
Hi Arjun,

Yes, direct support for external configuration files within Flink
ConfigMaps is somewhat restricted. The current method involves simply
copying two local files from the operator.

Please check : FlinkConfMountDecorator#getLocalLogConfFiles()


You can try a pod template for external configmap.

Thanks
Surendra

On Mon, Feb 19, 2024 at 11:17 PM arjun s  wrote:

> Hi team,
>
> I am currently in the process of deploying Flink on Kubernetes using the
> Flink Kubernetes Operator and have encountered a scenario where I need to
> pass runtime arguments to my Flink application from a properties file.
> Given the dynamic nature of Kubernetes environments and the need for
> flexibility in configuration management, I was wondering if the Flink
> Kubernetes Operator supports the use of Kubernetes ConfigMaps for this
> purpose. Specifically, I am interested in understanding:
>
> 1.How can I use a ConfigMap to pass runtime arguments or configurations
> stored in a properties file to a Flink job deployed using the Kubernetes
> operator?
> 2.Are there best practices or recommended approaches for managing
> application-specific configurations, such as database connections or other
> external resource settings, using ConfigMaps with the Flink Kubernetes
> Operator?
> 3.If direct support for ConfigMaps is not available or limited, could you
> suggest any workarounds or alternative strategies that align with Flink's
> deployment model on Kubernetes?
>
> I appreciate any guidance or documentation you could provide on this
> matter, as it would greatly assist in streamlining our deployment process
> and maintaining configuration flexibility in our Flink applications.
>
> Thank you for your time and support. I look forward to your response.
>


Re: Preparing keyed state before snapshot

2024-02-20 Thread Lorenzo Nicora
Thanks Zakelly,

I'd need to do something similar, with a map containing my non-serializable
"state", similar to the kvCache in FastTop1Fucntion.

But I am not sure I understand how I can set the keyed state for a specific
key, in snapshotState().
FastTop1Function seems to rely on keyContext set via setKeyContext(). This
method is not part of the API. I see it's set specifically for
AbstractTopNFuction in StreamExecRank.
How can I do something similar without modifying the Flink runtime?

Lorenzo


On Sun, 18 Feb 2024 at 03:42, Zakelly Lan  wrote:

> Hi Lorenzo,
>
> It is not recommended to do this with the keyed state. However there is an
> example in flink code (FastTop1Function#snapshotState) [1] of setting keys
> when snapshotState().
>
> Hope this helps.
>
> [1]
> https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165
>
> Best,
> Zakelly
>
> On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora 
> wrote:
>
>> Hi Thias
>>
>> I considered CheckpointedFunction.
>> In snapshotState() I would have to update the state of each key,
>> extracting the in-memory "state" of each key and putting it in the state
>> with state.update(...) .
>> This must happen per key,
>> But snapshotState() has no visibility of the keys. And I have no way of
>> selectively accessing the state of a specific key to update it.
>> Unless I am missing something
>>
>> Thanks
>> Lorenzo
>>
>>
>> On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias <
>> matthias.schwa...@viseca.ch> wrote:
>>
>>> Good morning Lorenzo,
>>>
>>>
>>>
>>> You may want to implement
>>> org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in
>>> your KeyedProcessFunction.
>>>
>>> Btw. By the time initializeState(…) is called, the state backend is
>>> fully initialized and can be read and written to (which is not the case for
>>> when the open(…) function is called.
>>>
>>> In initializeState(…) you also get access to state of different operator
>>> key.
>>>
>>> SnapshotState(…) is called as part of the (each) checkpoint in order to
>>> store data.
>>>
>>>
>>>
>>> Sincere greetings
>>>
>>>
>>>
>>> Thias
>>>
>>>
>>>
>>> *From:* Lorenzo Nicora 
>>> *Sent:* Thursday, February 15, 2024 7:50 PM
>>> *To:* Flink User Group 
>>> *Subject:* Preparing keyed state before snapshot
>>>
>>>
>>>
>>> Hello everyone,
>>>
>>>
>>>
>>> I have a convoluted problem.
>>>
>>>
>>>
>>> I am implementing a KeyedProcessFunction that keeps some
>>> non-serializable "state" in memory, in a transient Map (key = stream key,
>>> value = the non-serializable "state").
>>>
>>>
>>>
>>> I can extract a serializable representation to put in Flink state, and I
>>> can load my in-memory "state" from the Flink state. But these operations
>>> are expensive.
>>>
>>>
>>>
>>> Initializing the in-memory "state" is relatively easy. I do it lazily,
>>> in processElement(), on the first record for the key.
>>>
>>>
>>>
>>> The problem is saving the in-memory "state" to Flink state.
>>>
>>> I need to do it only before the state snapshot. But KeyedProcessFunction
>>> has no entrypoint called before the state snapshot.
>>>
>>> I cannot use CheckpointedFunction.snapshotState(), because it does not
>>> work for keyed state.
>>>
>>>
>>>
>>> Any suggestions?
>>>
>>>
>>>
>>> Note that I cannot use operator state nor a broadcast state.
>>>
>>> Processing is keyed. Every processed record modifies the in-memory
>>> "state" of that key. If the job rescale, the state of the key must follow
>>> the partition.
>>>
>>>
>>>
>>>
>>>
>>> Regards
>>>
>>> Lorenzo
>>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>>> dieser Informationen ist streng verboten.
>>>
>>> This message is intended only for the named recipient and may contain
>>> confidential or privileged information. As the confidentiality of email
>>> communication cannot be guaranteed, we do not accept any responsibility for
>>> the confidentiality and the intactness of this message. If you have
>>> received it in error, please advise the sender by return e-mail and delete
>>> this message and any attachments. Any unauthorised use or dissemination of
>>> this information is strictly prohibited.
>>>
>>


Help in designing the Flink usecase

2024-02-20 Thread neha goyal
Classification: External

Hi,

I have a use case involving calculating the lifetime order count of a
customer in real-time. To reduce the memory footprint, I plan to run a
batch job on stored data every morning (let's say at 5 am) to calculate the
total order count up to that moment. Additionally, I aim to deploy a Flink
job that can provide the updated order count in real-time after 5 am
reading from Kafka in real time. Is it possible for this job to understand
both the data from the batch job and the new order count to provide a
consistently accurate total count?

Please let me know if you have solved this use case earlier or any idea on
how to proceed.


<    5   6   7   8   9   10   11   12   13   14   >