Hi Timo, I actually tried many things, increasing jvm heap size and flink managed memory didn't help me. Running the same query without group by clause like this:
select avg(transaction_amount) as avg_ta, avg(salary+bonus) as avg_income, avg(salary+bonus) - avg(transaction_amount) as spending from transactions t left join customers c on t.customer_id = c.customer_id And execution time is 33 seconds, which is great, because is one row sink, also I tried only to join tables, but when sink is around 5 million, execution time is 986 seconds which I find strange, because it's only join, no aggregations. This is my connector code which is almost the same for both queries(output rows are different missing first and last name, because of removing group by) with group by and without group by: t_env.connect(FileSystem().path('transactions.csv')) \ .with_format(OldCsv().ignore_first_line().field_delimiter(",").quote_character('\"') .field('transaction_id', DataTypes.STRING()) .field('product_id', DataTypes.STRING()) .field('transaction_amount', DataTypes.DOUBLE()) .field('transaction_date', DataTypes.STRING()) .field('customer_id', DataTypes.STRING()) ) \ .with_schema(Schema() .field('transaction_id', DataTypes.STRING()) .field('product_id', DataTypes.STRING()) .field('transaction_amount', DataTypes.DOUBLE()) .field('transaction_date', DataTypes.STRING()) .field('customer_id', DataTypes.STRING()) ) \ .create_temporary_table('transactions') t_env.connect(FileSystem().path('join_output.csv')) \ .with_format(Csv().derive_schema() ) \ .with_schema(Schema() .field('avg_ta', DataTypes.DOUBLE()) .field('avg_income', DataTypes.INT()) .field('spending', DataTypes.DOUBLE()) ) \ .create_temporary_table('mySink') Customers table is the same, only with different fields. Thanks again!