Hello,

I am trying to create an in-memory table in PyFlink to use as a staging table 
after ingesting some data from Kafka but it doesn’t work as expected.

I have used the print connector which prints the results but I need to use a 
similar connector that stores staging results.  I have tried with blackhole and 
datagen but they can only be used a sink not a source to read from for further 
processing. Any suggestion on how to do that in Flink?
t_env.execute_sql("""
        CREATE TABLE stg_table (
        method STRING,
        value DOUBLE
        ) WITH (
                'connector' = 'print'
                )
        """)



result_table = t_env.sql_query(
"""
SELECT
method,
SUM(value) AS total_value
FROM stg_table
GROUP BY method
""")

with result_table.execute().collect() as results:
        for result in results:
        print(result)   

Reply via email to