Hello,

We are experimenting with lookup joins in Flink 1.13.0. Unfortunately, we
unexpectedly hit significant performance degradation when changing the
state backend to RocksDB.

We performed tests with two tables: fact table TXN and dimension table
CUSTOMER with the following schemas:

TXN:
 |-- PROD_ID: BIGINT
 |-- CUST_ID: BIGINT
 |-- TYPE: BIGINT
 |-- AMOUNT: BIGINT
 |-- ITEMS: BIGINT
 |-- TS: TIMESTAMP(3) **rowtime**
 |-- WATERMARK FOR TS: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS

CUSTOMER:
 |-- ID: BIGINT
 |-- STATE: BIGINT
 |-- AGE: BIGINT
 |-- SCORE: DOUBLE
 |-- PRIMARY KEY: ID

And the following query:
select state, sum(amount) from txn t JOIN customer FOR SYSTEM_TIME AS OF
t.ts ON t.cust_id = customer.id group by state, TUMBLE(t.ts, INTERVAL '1'
SECOND)

In our catalog, we reconfigured the customer table so that the watermark is
set to infinity on that side of the join. We generate data in a round robin
fashion (except for timestamp that grows with a step of 1 ms).

We performed our experiments on a single c5.4xlarge machine with heap and
managed memory size set to 12gb with a blackhole sink. With 2 000 000 fact
records and 100 000 dimension records, a job with heap backend finishes in
5 seconds whereas RocksDB executes in 1h 24m. For 400 000 dimension records
it doesn't grow significantly but goes up to 1h 36m (the job processes more
records after all).

We also checked what would happen if we reduced the amount of customer ids
to 1. Our expectation was that RocksDB will not offload anything to disk
anymore so the performance should be comparable with heap backend. It was
executed in 10 minutes.

Is this something anybody experienced or something to be expected? Of
course, we assumed RocksDB to perform slower but 300 eps is below our
expectations.

Thanks,
Adrian

Reply via email to