Hi,
Currently, Flink does not have a concept similar to an in-memory table that
allows you to temporarily store some data, because Flink itself does not store
data and is a computing engine.
May I ask what the purpose of using a temporary table is? Would it be possible
to use a filesystem connector as a temporary table to work around?
--
Best!
Xuyang
At 2024-06-07 03:26:27, "Phil Stavridis" <[email protected]> wrote:
Hello,
I am trying to create an in-memory table in PyFlink to use as a staging table
after ingesting some data from Kafka but it doesn’t work as expected.
I have used the print connector which prints the results but I need to use a
similar connector that stores staging results. I have tried with blackhole and
datagen but they can only be used a sink not a source to read from for further
processing. Any suggestion on how to do that in Flink?
t_env.execute_sql("""
CREATE TABLE stg_table (
method STRING,
value DOUBLE
) WITH (
'connector' = 'print'
)
""")
result_table = t_env.sql_query(
"""
SELECT
method,
SUM(value) AS total_value
FROM stg_table
GROUP BY method
""")
with result_table.execute().collect() as results:
for result in results:
print(result)