Re:Re: Running Flink SQL in production

2024-03-07 Thread Xuyang
Hi. 
Hmm, if I'm mistaken, please correct me. Using a SQL client might not be very 
convenient for those who need to verify the 
results of submissions, such as checking for exceptions related to submission 
failures, and so on.




--

Best!
Xuyang




在 2024-03-07 17:32:07,"Robin Moffatt"  写道:

Thanks for the reply. 
In terms of production, my thinking is you'll have your SQL in a file under 
code control. Whether that SQL ends up getting submitted via an invocation of 
SQL Client with -f or via REST API seems moot. WDYT? 







On Thu, 7 Mar 2024 at 01:53, Xuyang  wrote:

Hi, IMO, both the SQL Client and the Restful API can provide connections to the 
SQL Gateway service for submitting jobs. A slight difference is that the SQL 
Client also offers a command-line visual interface for users to view results. 
In your production scenes, placing the SQL to be submitted into a file and then 
using the '-f' command in SQL Client to submit the file sounds a bit 
roundabout. You can just use the Restful API to submit them directly?




--

Best!
Xuyang




At 2024-03-07 04:11:01, "Robin Moffatt via user"  wrote:

I'm reading the deployment guide[1] and wanted to check my understanding. For 
deploying a SQL job into production, would the pattern be to write the SQL in a 
file that's under source control, and pass that file as an argument to SQL 
Client with -f argument (as in this docs example[2])?
Or script a call to the SQL Gateway's REST API? 


Are there pros and cons to each approach?


thanks, Robin


[1]: 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/overview/
[2]: 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sqlclient/#execute-sql-files

TTL in pyflink does not seem to work

2024-03-07 Thread Ivan Petrarka
Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to 
work. I have reproduced the exact same code in Java and it works!

Is this a pyflink bug? If so - how can I report it? If not - what can I try to 
do?

Flink: 1.18.0
image: flink:1.18.0-scala_2.12-java11

Code to reproduce. I expect this code to print:  all 
the time. But it prints  and state value

```python
import time

from datetime import datetime

from pyflink.common import Time, Types
from pyflink.datastream import KeyedProcessFunction, RuntimeContext, 
StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor


class Processor(KeyedProcessFunction):
    def open(self, runtime_context: RuntimeContext):
        state_descriptor = ValueStateDescriptor(
            name="my_state",
            value_type_info=Types.STRING(),
        )

        state_descriptor.enable_time_to_live(
            ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
            .cleanup_incrementally(cleanup_size=10, 
run_cleanup_for_every_record=True)
            .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
            
.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .build()
        )

        self.state = runtime_context.get_state(state_descriptor)

    def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
        current_state = self.state.value()

        print(datetime.now(), current_state)

        if current_state is None:
            self.state.update(str(datetime.now()))

        time.sleep(1.5)


if __name__ == "__main__":
    # - Init environment

    environment = 
StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)

    # - Setup pipeline

    (
        environment.set_parallelism(1)
        .from_collection(
            collection=list(range(10)),
        )
        .key_by(lambda value: 0)
        .process(Processor())



    )

    # - Execute pipeline

    environment.execute("ttl_test")



```

```java
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.time.LocalDateTime;

public class GameHistoryProcessor extends KeyedProcessFunction {


    private transient ValueState state;


    @Override
    public void open(Configuration parameters) {
        var stateTtlConfig = StateTtlConfig
                .newBuilder(Time.seconds(1))
//                .cleanupFullSnapshot()
                .cleanupIncrementally(10, true)
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        var stateDescriptor = new ValueStateDescriptor<>("state", String.class);
        stateDescriptor.enableTimeToLive(stateTtlConfig);

        state = getRuntimeContext().getState(stateDescriptor);

    }

    @Override
    public void processElement(String event, Context context, Collector 
collector) throws IOException, InterruptedException {
        var state = state.value();
        System.out.println("State: " + state);

        if (state == null) {
            state = LocalDateTime.now().toString();
            state.update(state);
        }

        Thread.sleep(1500);
    }
}```


Re: TTL in pyflink does not seem to work

2024-03-07 Thread Ivan Petrarka
Note, that in Java code, it prints `State: Null`, `State: Null`, as I was 
expecting in, unlike pyflink code
On Mar 7, 2024 at 15:59 +0400, Ivan Petrarka , wrote:
> Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to 
> work. I have reproduced the exact same code in Java and it works!
>
> Is this a pyflink bug? If so - how can I report it? If not - what can I try 
> to do?
>
> Flink: 1.18.0
> image: flink:1.18.0-scala_2.12-java11
>
> Code to reproduce. I expect this code to print:  all 
> the time. But it prints  and state value
>
> ```python
> import time
>
> from datetime import datetime
>
> from pyflink.common import Time, Types
> from pyflink.datastream import KeyedProcessFunction, RuntimeContext, 
> StreamExecutionEnvironment, TimeCharacteristic
> from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor
>
>
> class Processor(KeyedProcessFunction):
>     def open(self, runtime_context: RuntimeContext):
>         state_descriptor = ValueStateDescriptor(
>             name="my_state",
>             value_type_info=Types.STRING(),
>         )
>
>         state_descriptor.enable_time_to_live(
>             ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
>             .cleanup_incrementally(cleanup_size=10, 
> run_cleanup_for_every_record=True)
>             .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
>             
> .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>             .build()
>         )
>
>         self.state = runtime_context.get_state(state_descriptor)
>
>     def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
>         current_state = self.state.value()
>
>         print(datetime.now(), current_state)
>
>         if current_state is None:
>             self.state.update(str(datetime.now()))
>
>         time.sleep(1.5)
>
>
> if __name__ == "__main__":
>     # - Init environment
>
>     environment = 
> StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
>
>     # - Setup pipeline
>
>     (
>         environment.set_parallelism(1)
>         .from_collection(
>             collection=list(range(10)),
>         )
>         .key_by(lambda value: 0)
>         .process(Processor())
>
>
>
>     )
>
>     # - Execute pipeline
>
>     environment.execute("ttl_test")
>
>
>
> ```
>
> ```java
> import org.apache.flink.api.common.state.StateTtlConfig;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.time.Time;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.metrics.Histogram;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.util.Collector;
>
> import java.io.IOException;
> import java.time.LocalDateTime;
>
> public class GameHistoryProcessor extends KeyedProcessFunction String, String> {
>
>
>     private transient ValueState state;
>
>
>     @Override
>     public void open(Configuration parameters) {
>         var stateTtlConfig = StateTtlConfig
>                 .newBuilder(Time.seconds(1))
> //                .cleanupFullSnapshot()
>                 .cleanupIncrementally(10, true)
>                 .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>                 
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>                 .build();
>
>         var stateDescriptor = new ValueStateDescriptor<>("state", 
> String.class);
>         stateDescriptor.enableTimeToLive(stateTtlConfig);
>
>         state = getRuntimeContext().getState(stateDescriptor);
>
>     }
>
>     @Override
>     public void processElement(String event, Context context, 
> Collector collector) throws IOException, InterruptedException {
>         var state = state.value();
>         System.out.println("State: " + state);
>
>         if (state == null) {
>             state = LocalDateTime.now().toString();
>             state.update(state);
>         }
>
>         Thread.sleep(1500);
>     }
> }```


Using User-defined Table Aggregates Functions in SQL

2024-03-07 Thread Jad Naous
Hi,
The docs don't mention the correct syntax for using UDTAGGs in SQL. Is it
possible to use them with SQL?
Thanks,
Jad Naous

Grepr, CEO/Founder

ᐧ


Re: Handling late events with Table API / SQL

2024-03-07 Thread Sunny S
Thanks for the response! Sad that that side output for late data is not 
supported in Table API and SQL. I will start the discussions regarding this.

In the meanwhile, I am trying to use the built-in function 
CURRENT_WATERMARK(rowtime) to be able to collect late data. The scenario I have 
is : I am creating a table with Kafka connector and defining the watermark in 
that table. Reference to this table definition can be found in the mail above. 
Next, I apply a tumbling window SQL query on this table. I want to collect the 
late data for this window operation. I am not clear how would CURRENT_WATERMARK 
function help me in getting the late data for the window operator.

Also, I am a bit confused regarding the way we determine if an event is late 
for a window operator. From the WindowOperator code :

protected boolean isElementLate(StreamRecord element) {
return (windowAssigner.isEventTime())
&& (element.getTimestamp() + allowedLateness
<= internalTimerService.currentWatermark());
}

it seems the operator maintains a currentWatermark. I am trying to understand 
how does this currentWatermark change during the course of the operator 
receiving the first event that belongs to this window until the time this 
window fires.

Please help understanding these.

Thanks










From: Feng Jin 
Sent: 06 March 2024 07:08
To: Sunny S 
Cc: user@flink.apache.org 
Subject: Re: Handling late events with Table API / SQL


You can use the  CURRENT_WATERMARK(rowtime)  function for some filtering, 
please refer to [1] for details.


https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/

Best,
Feng

On Wed, Mar 6, 2024 at 1:56 AM Sunny S 
mailto:sunny8...@outlook.in>> wrote:
Hi,

I am using Flink SQL to create a table something like this :

CREATE TABLE some-table (
  ...,
  ...,
  ...,
  ...,
  event_timestamp as TO_TIMESTAMP_LTZ(event_time*1000, 3),
  WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '30' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'some-topic', +
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'value.format' = 'csv'
)

I want to understand how can I deal with late events / out of order events when 
using Flink SQL / Table API? How can I collect the late / out of order events 
to a side output with Table API / SQL?

Thanks


Re: Using User-defined Table Aggregates Functions in SQL

2024-03-07 Thread Junrui Lee
Hi Jad,

You can refer to the CREATE FUNCTION section (
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-function)
and the Table Aggregate Functions section (
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#table-aggregate-functions)
for details on creating and using these functions.

Best regards,
Junrui

Jad Naous  于2024年3月7日周四 22:19写道:

> Hi,
> The docs don't mention the correct syntax for using UDTAGGs in SQL. Is it
> possible to use them with SQL?
> Thanks,
> Jad Naous
> 
> Grepr, CEO/Founder
>
> ᐧ
>


Fwd: Flink Checkpoint & Offset Commit

2024-03-07 Thread Jacob Rollings
Hello,

I am implementing proof of concepts based Flink realtime streaming
solutions.

I came across below lines in out-of-the-box Flink Kafka connector documents.


*https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/*


*Consumer Offset Committing #
*

*Kafka source commits the current consuming offset when checkpoints
are completed, for ensuring the consistency between Flink’s checkpoint
state and committed offsets on Kafka brokers*.


How is Flink able to control the callbacks from checkpointing? Is there a
way to override this into my implementations. I have multiple upstream
sources to connect to depending on the business model which are not Kafka.
Based on criticality of the system and publisher dependencies, we cannot
switch to Kafka for these. So I was hoping to do the same which kafka
connector is doing.


Cheers,

JR


Re: Using User-defined Table Aggregates Functions in SQL

2024-03-07 Thread Jad Naous
Hi Junrui,
Thank you for the pointer. I had read that page, and I can use the function
with the Java Table API ok, but I'm trying to use the Top2 accumulator with
a SQL function. I can't use a left lateral join on it since the planner
fails with "not a table function". I don't think a join is the right thing
anyway, since it's an aggregation table function.

tEnv.createTemporaryFunction("TOP2", Top2.class);
>
> var calculated2 = tEnv.sqlQuery(
> "SELECT " +
> "  TUMBLE_START(ts, INTERVAL '1' SECOND) as w_start, " +
> "  TUMBLE_END(ts, INTERVAL '1' SECOND) as w_end, " +
> "  TUMBLE_ROWTIME(ts, INTERVAL '1' SECOND) as w_rowtime, " +
> "  id, " +
> "  top1, " +
> "  top2 " +
> "FROM " +
> "  source " +
> "  LEFT JOIN LATERAL TABLE(TOP2(val)) ON TRUE " +
> "GROUP BY " +
> "  TUMBLE(ts, INTERVAL '1' SECOND), " +
> "  id"
> ).printExplain();
>

Gives the following:

   org.apache.flink.table.api.ValidationException: SQL validation failed.
> Function 'default_catalog.default_database.TOP2' cannot be used as a table
> function.
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:200)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117)
> at
> org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:261)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
> at
> app//org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:708)
> at
> app//io.grepr.query.MetricsTableApiTest.test(MetricsTableApiTest.java:129)
> Caused by:
> org.apache.flink.table.api.ValidationException: Function
> 'default_catalog.default_database.TOP2' cannot be used as a table function.
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.verifyFunctionKind(FunctionCatalogOperatorTable.java:200)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:133)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:126)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
> at java.base@11.0.22
> /java.util.Optional.flatMap(Optional.java:294)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:100)
> at
> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:69)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1310)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1296)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1296)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1296)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1296)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:993)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:749)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:196)


Jad Naous

Grepr, CEO/Founder


ᐧ

On Thu, Mar 7, 2024 at 9:43 AM Junrui Lee  wrote:

> Hi Jad,
>
> You can refer to the CREATE FUNCTION section (
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-function)
> and the Table Aggregate Functions section (
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#table-aggregate-functions)
> for details on creating and using these functions.
>
> Best regards,
> Junrui
>
> Jad Naous  于2024年3月7日周四 22:19写道:
>
>> Hi,
>> The docs don't mention the correct syntax for using UDTAGGs in SQL. Is it
>> possible to use them with SQL?
>> Thanks,
>> Jad Naous
>> 
>> Grepr, CEO/Founder
>>
>> ᐧ
>>
>


Re: Re: Running Flink SQL in production

2024-03-07 Thread Feng Jin
Hi,

If you need to use Flink SQL in a production environment, I think it would
be better to use the Table API [1] and package it into a jar.
Then submit the jar to the cluster environment.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/common/#sql

Best,
Feng

On Thu, Mar 7, 2024 at 9:56 PM Xuyang  wrote:

> Hi.
> Hmm, if I'm mistaken, please correct me. Using a SQL client might not be
> very convenient for those who need to verify the
> results of submissions, such as checking for exceptions related to
> submission failures, and so on.
>
>
> --
> Best!
> Xuyang
>
>
> 在 2024-03-07 17:32:07,"Robin Moffatt"  写道:
>
> Thanks for the reply.
> In terms of production, my thinking is you'll have your SQL in a file
> under code control. Whether that SQL ends up getting submitted via an
> invocation of SQL Client with -f or via REST API seems moot. WDYT?
>
>
>
> On Thu, 7 Mar 2024 at 01:53, Xuyang  wrote:
>
>> Hi, IMO, both the SQL Client and the Restful API can provide connections
>> to the SQL Gateway service for submitting jobs. A slight difference is that
>> the SQL Client also offers a command-line visual interface for users to
>> view results.
>> In your production scenes, placing the SQL to be submitted into a file
>> and then using the '-f' command in SQL Client to submit the file sounds a
>> bit roundabout. You can just use the Restful API to submit them directly?
>>
>>
>> --
>> Best!
>> Xuyang
>>
>>
>> At 2024-03-07 04:11:01, "Robin Moffatt via user" 
>> wrote:
>>
>> I'm reading the deployment guide[1] and wanted to check my understanding.
>> For deploying a SQL job into production, would the pattern be to write the
>> SQL in a file that's under source control, and pass that file as an
>> argument to SQL Client with -f argument (as in this docs example[2])?
>> Or script a call to the SQL Gateway's REST API?
>>
>> Are there pros and cons to each approach?
>>
>> thanks, Robin
>>
>> [1]:
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/overview/
>> [2]:
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sqlclient/#execute-sql-files
>>
>>


Re:Re: Handling late events with Table API / SQL

2024-03-07 Thread Xuyang
Hi, Sunny.
A watermark always comes from one subtask of this window operator's input(s),  
and this window operator will retain all watermarks about multi input subtasks.
The `currentWatermark` in the window operator is the min value of these 
watermarks.

--

Best!
Xuyang




At 2024-03-07 23:03:39, "Sunny S"  wrote:

Thanks for the response! Sad that that side output for late data is not 
supported in Table API and SQL. I will start the discussions regarding this.


In the meanwhile, I am trying to use the built-in function 
CURRENT_WATERMARK(rowtime) to be able to collect late data. The scenario I have 
is : I am creating a table with Kafka connector and defining the watermark in 
that table. Reference to this table definition can be found in the mail above. 
Next, I apply a tumbling window SQL query on this table. I want to collect the 
late data for this window operation. I am not clear how would CURRENT_WATERMARK 
function help me in getting the late data for the window operator.


Also, I am a bit confused regarding the way we determine if an event is late 
for a window operator. From the WindowOperator code : 


protected boolean isElementLate(StreamRecord element) {
return (windowAssigner.isEventTime())
&& (element.getTimestamp() + allowedLateness
<= internalTimerService.currentWatermark());
}


it seems the operator maintains a currentWatermark. I am trying to understand 
how does this currentWatermark change during the course of the operator 
receiving the first event that belongs to this window until the time this 
window fires.  


Please help understanding these.


Thanks 


















From: Feng Jin 
Sent: 06 March 2024 07:08
To: Sunny S 
Cc: user@flink.apache.org 
Subject: Re: Handling late events with Table API / SQL
 


You can use the  CURRENT_WATERMARK(rowtime)  function for some filtering, 
please refer to [1] for details.




https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/


Best,
Feng



On Wed, Mar 6, 2024 at 1:56 AM Sunny S  wrote:

Hi,


I am using Flink SQL to create a table something like this :


CREATE TABLE some-table ( 
  ...,
  ...,
  ...,
  ...,
  event_timestamp as TO_TIMESTAMP_LTZ(event_time*1000, 3),
  WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '30' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'some-topic', +
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'value.format' = 'csv'
)


I want to understand how can I deal with late events / out of order events when 
using Flink SQL / Table API? How can I collect the late / out of order events 
to a side output with Table API / SQL?


Thanks 

Re: Flink Checkpoint & Offset Commit

2024-03-07 Thread xia rui
Hi Jacob.

Flink uses "notification" to let an operator callback the completion of a
checkpoint. After gathering all checkpoint done messages from TMs, JM sends
a "notify checkpoint completed" RPC to all TMs. Operators will handle this
notification, where checkpoint success callbacks are invoked. For example,
Kafka sources commit the current consuming offset. I think this doc (
https://flink.apache.org/2018/02/28/an-overview-of-end-to-end-exactly-once-processing-in-apache-flink-with-apache-kafka-too/)
may be helpful.

You can override the `notifyCheckpointComlete()` to customize the behavior
of handling checkpoint completion.

Best regards Rui Xia

On Fri, Mar 8, 2024 at 3:03 AM Jacob Rollings 
wrote:

>
> Hello,
>
> I am implementing proof of concepts based Flink realtime streaming
> solutions.
>
> I came across below lines in out-of-the-box Flink Kafka connector
> documents.
>
>
>
> *https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/*
> 
>
> *Consumer Offset Committing #
> *
>
> *Kafka source commits the current consuming offset when checkpoints
> are completed, for ensuring the consistency between Flink’s checkpoint
> state and committed offsets on Kafka brokers*.
>
>
> How is Flink able to control the callbacks from checkpointing? Is there a
> way to override this into my implementations. I have multiple upstream
> sources to connect to depending on the business model which are not Kafka.
> Based on criticality of the system and publisher dependencies, we cannot
> switch to Kafka for these. So I was hoping to do the same which kafka
> connector is doing.
>
>
> Cheers,
>
> JR
>


Re:Re: Using User-defined Table Aggregates Functions in SQL

2024-03-07 Thread Xuyang
Hi, Jad.
IIUC, TableAggregateFunfunction has not been supported in SQL. The original 
Flip[1] only implements it in Table API. You can send an email to dev maillist 
for more detail and create an improvement jira[2] for it.


[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739
[2] https://issues.apache.org/jira/projects/FLINK/issues



--

Best!
Xuyang




在 2024-03-08 03:12:19,"Jad Naous"  写道:

Hi Junrui,
Thank you for the pointer. I had read that page, and I can use the function 
with the Java Table API ok, but I'm trying to use the Top2 accumulator with a 
SQL function. I can't use a left lateral join on it since the planner fails 
with "not a table function". I don't think a join is the right thing anyway, 
since it's an aggregation table function.


tEnv.createTemporaryFunction("TOP2", Top2.class);

var calculated2 = tEnv.sqlQuery(
"SELECT " +
"  TUMBLE_START(ts, INTERVAL '1' SECOND) as w_start, " +
"  TUMBLE_END(ts, INTERVAL '1' SECOND) as w_end, " +
"  TUMBLE_ROWTIME(ts, INTERVAL '1' SECOND) as w_rowtime, " +
"  id, " +
"  top1, " +
"  top2 " +
"FROM " +
"  source " +
"  LEFT JOIN LATERAL TABLE(TOP2(val)) ON TRUE " +
"GROUP BY " +
"  TUMBLE(ts, INTERVAL '1' SECOND), " +
"  id"
).printExplain();



Gives the following:


   org.apache.flink.table.api.ValidationException: SQL validation failed. 
Function 'default_catalog.default_database.TOP2' cannot be used as a table 
function.
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:200)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117)
at 
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:261)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
at 
app//org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:708)
at 
app//io.grepr.query.MetricsTableApiTest.test(MetricsTableApiTest.java:129)
Caused by:
org.apache.flink.table.api.ValidationException: Function 
'default_catalog.default_database.TOP2' cannot be used as a table function.
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.verifyFunctionKind(FunctionCatalogOperatorTable.java:200)
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:133)
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:126)
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
at java.base@11.0.22/java.util.Optional.flatMap(Optional.java:294)
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:100)
at 
org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:69)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1310)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1296)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1296)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1296)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1296)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:993)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:749)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:196)
 


Jad Naous

Grepr, CEO/Founder




ᐧ


On Thu, Mar 7, 2024 at 9:43 AM Junrui Lee  wrote:

Hi Jad,

You can refer to the CREATE FUNCTION section 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-function)
 and the Table Aggregate Functions section 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#table-aggregate-functions)
 for details on creating and using these functions.

Best regards,

Junrui


Jad Naous  于2024年3月7日周四 22:19写道:

Hi,
The docs don't mention the correct syntax for using UDTAGGs in SQL. Is it 
possible to

Re: Flink Checkpoint & Offset Commit

2024-03-07 Thread Yanfei Lei
Hi Jacob,

> I have multiple upstream sources to connect to depending on the business 
> model which are not Kafka. Based on criticality of the system and publisher 
> dependencies, we cannot switch to Kafka for these.

Sounds like you want to implement some custom connectors, [1][2] may
be helpful to implement a custom Flink’s Table API connector.

Specifically in terms of “Flink Checkpoint & Offset Commit”, the
custom source needs to inherit the `SourceReader` interfaces, and you
can override `snapshotState()` and `notifyCheckpointComplete()` into
your implementations.
[3] is the related code of kafka connector under datastream API, [4]
is the related code of kafka connector under TABLE API & SQL.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/
[2] 
https://flink.apache.org/2021/09/07/implementing-a-custom-source-connector-for-table-api-and-sql-part-one/
[3] 
https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L98-L177
[4] 
https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java#L354

xia rui  于2024年3月8日周五 10:12写道:
>
> Hi Jacob.
>
> Flink uses "notification" to let an operator callback the completion of a 
> checkpoint. After gathering all checkpoint done messages from TMs, JM sends a 
> "notify checkpoint completed" RPC to all TMs. Operators will handle this 
> notification, where checkpoint success callbacks are invoked. For example, 
> Kafka sources commit the current consuming offset. I think this doc 
> (https://flink.apache.org/2018/02/28/an-overview-of-end-to-end-exactly-once-processing-in-apache-flink-with-apache-kafka-too/)
>  may be helpful.
>
> You can override the `notifyCheckpointComlete()` to customize the behavior of 
> handling checkpoint completion.
>
> Best regards Rui Xia
>
> On Fri, Mar 8, 2024 at 3:03 AM Jacob Rollings  
> wrote:
>>
>>
>> Hello,
>>
>> I am implementing proof of concepts based Flink realtime streaming solutions.
>>
>> I came across below lines in out-of-the-box Flink Kafka connector documents.
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/
>> Consumer Offset Committing #
>>
>> Kafka source commits the current consuming offset when checkpoints are 
>> completed, for ensuring the consistency between Flink’s checkpoint state and 
>> committed offsets on Kafka brokers.
>>
>>
>> How is Flink able to control the callbacks from checkpointing? Is there a 
>> way to override this into my implementations. I have multiple upstream 
>> sources to connect to depending on the business model which are not Kafka. 
>> Based on criticality of the system and publisher dependencies, we cannot 
>> switch to Kafka for these. So I was hoping to do the same which kafka 
>> connector is doing.
>>
>>
>> Cheers,
>>
>> JR



-- 
Best,
Yanfei