hi flink community~
I came across a problem I didn't understand,I can't use pyflink
aggfuction function properly in window tvf, The following are available:
java aggfuntion
flink system aggfunction
window (not window tvf)
I want to know if this is a bug or if I'm using it the wrong way?Thanks!
from datetime import datetime, timedelta
from pyflink.table import AggregateFunction
from pyflink.common.typeinfo import Types
from pyflink.common import Row
from pyflink.table import Schema, StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment
class Sum0(AggregateFunction):
def get_value(self, accumulator):
return accumulator[0]
def create_accumulator(self):
return Row(0)
def accumulate(self, accumulator, *args):
if args[0] is not None:
accumulator[0] += args[0]
def retract(self, accumulator, *args):
if args[0] is not None:
accumulator[0] -= args[0]
def merge(self, accumulator, accumulators):
for acc in accumulators:
accumulator[0] += acc[0]
def get_result_type(self):
return "BIGINT"
def get_accumulator_type(self):
return 'ROW'
def test_py_udf_kafka():
# stream
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///Users/faron/Downloads/flink-sql-connector-kafka-1.17.1.jar")
env.set_parallelism(1)
table_env = StreamTableEnvironment.create(stream_execution_environment=env)
ds = env.from_collection(
collection=[(1, 2, "Lee", datetime.now() - timedelta(hours=4)),
(2, 3, "Lee", datetime.now() - timedelta(hours=4)),
(3, 4, "Jay", datetime.now() - timedelta(hours=4)),
(5, 6, "Jay", datetime.now() - timedelta(hours=2)),
(7, 8, "Lee", datetime.now())],
type_info=Types.ROW([Types.INT(),
Types.INT(),
Types.STRING(),
Types.SQL_TIMESTAMP()]))
table_schema = Schema.new_builder() \
.column("f0", "INT") \
.column("f1", "INT") \
.column("f2", "STRING") \
.column_by_expression("rowtime", "CAST(f3 AS TIMESTAMP(3))") \
.watermark("rowtime", "rowtime - INTERVAL '1' SECOND") \
.build()
ts = table_env.from_data_stream(ds, table_schema) \
.alias("value", "count", "name", "rowtime")
print(“schema desc")
ts.print_schema()
sql_sink_dll_1 = """CREATE TABLE kafka_test(
`name` string, `agg_data` bigint)
with (
'connector' = 'kafka',
'topic'='test_java2',
'properties.bootstrap.servers'='agent3:9092',
'value.format' = 'json'
);"""
table_env.execute_sql(sql_sink_dll_1)
table_env.create_temporary_view("source", ts)
table_env.create_temporary_function(
"sum_udf",
Sum0())
sql_query_system = """
select name,sum(`value`) as agg_data from
TABLE(TUMBLE(TABLE source, DESCRIPTOR(rowtime),INTERVAL '1' HOURS))
group by window_start, window_end, name
"""
sql_query = """
select name,sum_udf(`value`) as agg_data from
TABLE(TUMBLE(TABLE source, DESCRIPTOR(rowtime),INTERVAL '1' HOURS))
group by window_start, window_end, name
"""
print(table_env.explain_sql(sql_query))
table_env.sql_query(sql_query).execute().print()
table_env.sql_query(sql_query).execute_insert("kafka_test").wait()
if __name__ == "__main__":
test_py_udf_kafka()