Hi Timo,

I actually tried many things, increasing jvm heap size and flink managed
memory didn't help me. Running the same query without group by clause like
this:

select
    avg(transaction_amount) as avg_ta,
    avg(salary+bonus) as avg_income,
    avg(salary+bonus) - avg(transaction_amount) as spending
    from transactions t left join customers c  on t.customer_id = c.customer_id

And execution time is 33 seconds, which is great, because is one row sink,
also I tried only to join tables, but when sink is around 5 million,
execution time is 986 seconds which I find strange, because it's only join,
no aggregations.

This is my connector code which is almost the same for both queries(output
rows are different missing first and last name, because of removing group
by) with group by and without group by:

t_env.connect(FileSystem().path('transactions.csv')) \
    
.with_format(OldCsv().ignore_first_line().field_delimiter(",").quote_character('\"')
                 .field('transaction_id', DataTypes.STRING())
                 .field('product_id', DataTypes.STRING())
                 .field('transaction_amount', DataTypes.DOUBLE())
                 .field('transaction_date', DataTypes.STRING())
                 .field('customer_id', DataTypes.STRING())
                 ) \
    .with_schema(Schema()
                 .field('transaction_id', DataTypes.STRING())
                 .field('product_id', DataTypes.STRING())
                 .field('transaction_amount', DataTypes.DOUBLE())
                 .field('transaction_date', DataTypes.STRING())
                 .field('customer_id', DataTypes.STRING())
                 ) \
    .create_temporary_table('transactions')

t_env.connect(FileSystem().path('join_output.csv')) \
    .with_format(Csv().derive_schema()
                 ) \
    .with_schema(Schema()
                 .field('avg_ta', DataTypes.DOUBLE())
                 .field('avg_income', DataTypes.INT())
                 .field('spending', DataTypes.DOUBLE())
                 ) \
    .create_temporary_table('mySink')

Customers table is the same, only with different fields.

Thanks again!
  • Re: Timo Walther
    • Re: Violeta Milanović
      • Re: Timo Walther
        • Re: Timo Walther

Reply via email to