________________________________ 发件人: faronzz <faro...@163.com> 发送时间: 2023年8月26日 22:12 收件人: user-zh@flink.apache.org <user-zh@flink.apache.org> 主题: pyflink aggfunction in window tvf can not sink connection='kafka', it notice consuming update changesm, java aggfunction and aggfunction in flink, such as sum is ok
hi~ I came across a problem I didn't understand,I can't use pyflink aggfuction function properly in window tvf, The following are available: java aggfuntion flink system aggfunction window (not window tvf) I want to know if this is a bug or if I'm using it the wrong way? pyflink 1.17.1 flink 1.17.1 from datetime import datetime, timedelta from pyflink.table import AggregateFunction from pyflink.common.typeinfo import Types from pyflink.common import Row from pyflink.table import Schema, StreamTableEnvironment from pyflink.datastream import StreamExecutionEnvironment class Sum0(AggregateFunction): def get_value(self, accumulator): return accumulator[0] def create_accumulator(self): return Row(0) def accumulate(self, accumulator, *args): if args[0] is not None: accumulator[0] += args[0] def retract(self, accumulator, *args): if args[0] is not None: accumulator[0] -= args[0] def merge(self, accumulator, accumulators): for acc in accumulators: accumulator[0] += acc[0] def get_result_type(self): return "BIGINT" def get_accumulator_type(self): return 'ROW' def test_py_udf_kafka(): # 流模式 env = StreamExecutionEnvironment.get_execution_environment() env.add_jars("file:///Users/faron/Downloads/flink-sql-connector-kafka-1.17.1.jar") env.set_parallelism(1) table_env = StreamTableEnvironment.create(stream_execution_environment=env) ds = env.from_collection( collection=[(1, 2, "Lee", datetime.now() - timedelta(hours=4)), (2, 3, "Lee", datetime.now() - timedelta(hours=4)), (3, 4, "Jay", datetime.now() - timedelta(hours=4)), (5, 6, "Jay", datetime.now() - timedelta(hours=2)), (7, 8, "Lee", datetime.now())], type_info=Types.ROW([Types.INT(), Types.INT(), Types.STRING(), Types.SQL_TIMESTAMP()])) table_schema = Schema.new_builder() \ .column("f0", "INT") \ .column("f1", "INT") \ .column("f2", "STRING") \ .column_by_expression("rowtime", "CAST(f3 AS TIMESTAMP(3))") \ .watermark("rowtime", "rowtime - INTERVAL '1' SECOND") \ .build() ts = table_env.from_data_stream(ds, table_schema) \ .alias("value", "count", "name", "rowtime") print("打印源表结构") ts.print_schema() sql_sink_dll_1 = """CREATE TABLE kafka_test( `name` string, `agg_data` bigint) with ( 'connector' = 'kafka', 'topic'='test_java2', 'properties.bootstrap.servers'='agent3:9092', 'value.format' = 'json' );""" table_env.execute_sql(sql_sink_dll_1) table_env.create_temporary_view("source", ts) table_env.create_temporary_function( "sum_udf", Sum0()) sql_query_system = """ select name,sum(`value`) as agg_data from TABLE(TUMBLE(TABLE source, DESCRIPTOR(rowtime),INTERVAL '1' HOURS)) group by window_start, window_end, name """ sql_query = """ select name,sum_udf(`value`) as agg_data from TABLE(TUMBLE(TABLE source, DESCRIPTOR(rowtime),INTERVAL '1' HOURS)) group by window_start, window_end, name """ print(table_env.explain_sql(sql_query)) table_env.sql_query(sql_query).execute().print() table_env.sql_query(sql_query).execute_insert("kafka_test").wait() if __name__ == "__main__": test_py_udf_kafka() | | faronzz | | faro...@163.com | | | faronzz | | faro...@163.com |