发件人: faronzz
发送时间: 2023年8月26日 22:12
收件人: user-zh@flink.apache.org
主题: pyflink aggfunction in window tvf can not sink connection='kafka', it
notice consuming update changesm, java aggfunction and aggfunction in flink,
such as sum is ok
hi~
I came across a problem I didn't understand,I can't use pyflink
aggfuction function properly in window tvf, The following are available:
java aggfuntion
flink system aggfunction
window (not window tvf)
I want to know if this is a bug or if I'm using it the wrong way?
pyflink 1.17.1
flink 1.17.1
from datetime import datetime, timedelta
from pyflink.table import AggregateFunction
from pyflink.common.typeinfo import Types
from pyflink.common import Row
from pyflink.table import Schema, StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment
class Sum0(AggregateFunction):
def get_value(self, accumulator):
return accumulator[0]
def create_accumulator(self):
return Row(0)
def accumulate(self, accumulator, *args):
if args[0] is not None:
accumulator[0] += args[0]
def retract(self, accumulator, *args):
if args[0] is not None:
accumulator[0] -= args[0]
def merge(self, accumulator, accumulators):
for acc in accumulators:
accumulator[0] += acc[0]
def get_result_type(self):
return "BIGINT"
def get_accumulator_type(self):
return 'ROW'
def test_py_udf_kafka():
# 流模式
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///Users/faron/Downloads/flink-sql-connector-kafka-1.17.1.jar")
env.set_parallelism(1)
table_env = StreamTableEnvironment.create(stream_execution_environment=env)
ds = env.from_collection(
collection=[(1, 2, "Lee", datetime.now() - timedelta(hours=4)),
(2, 3, "Lee", datetime.now() - timedelta(hours=4)),
(3, 4, "Jay", datetime.now() - timedelta(hours=4)),
(5, 6, "Jay", datetime.now() - timedelta(hours=2)),
(7, 8, "Lee", datetime.now())],
type_info=Types.ROW([Types.INT(),
Types.INT(),
Types.STRING(),
Types.SQL_TIMESTAMP()]))
table_schema = Schema.new_builder() \
.column("f0", "INT") \
.column("f1", "INT") \
.column("f2", "STRING") \
.column_by_expression("rowtime", "CAST(f3 AS TIMESTAMP(3))") \
.watermark("rowtime", "rowtime - INTERVAL '1' SECOND") \
.build()
ts = table_env.from_data_stream(ds, table_schema) \
.alias("value", "count", "name", "rowtime")
print("打印源表结构")
ts.print_schema()
sql_sink_dll_1 = """CREATE TABLE kafka_test(
`name` string, `agg_data` bigint)
with (
'connector' = 'kafka',
'topic'='test_java2',
'properties.bootstrap.servers'='agent3:9092',
'value.format' = 'json'
);"""
table_env.execute_sql(sql_sink_dll_1)
table_env.create_temporary_view("source", ts)
table_env.create_temporary_function(
"sum_udf",
Sum0())
sql_query_system = """
select name,sum(`value`) as agg_data from
TABLE(TUMBLE(TABLE source, DESCRIPTOR(rowtime),INTERVAL '1' HOURS))
group by window_start, window_end, name
"""
sql_query = """
select name,sum_udf(`value`) as agg_data from
TABLE(TUMBLE(TABLE source, DESCRIPTOR(rowtime),INTERVAL '1' HOURS))
group by window_start, window_end, name
"""
print(table_env.explain_sql(sql_query))
table_env.sql_query(sql_query).execute().print()
table_env.sql_query(sql_query).execute_insert("kafka_test").wait()
if __name__ == "__main__":
test_py_udf_kafka()
| |
faronzz
|
|
faro...@163.com
|
| |
faronzz
|
|
faro...@163.com
|