Re: Subpar performance of temporal joins with RocksDB backend

2021-07-20 Thread Robert Metzger
Are you using remote disks for rocksdb? (I guess that's EBS on AWS) Afaik
there are usually limitations wrt to the IOPS you can perform.

I would generally recommend measuring where the bottleneck is coming from.
It could be that your CPUs are at 100%, then adding more machines / cores
will help (make sure that all CPU cores are in use by setting the
parallelism >= cores). -- with the rocksdb statebackend, Flink needs to
serialize all records. That's not necessary with the heap backend, because
the data is on the heap.
Or your bottleneck is the EBS / disk storage, where the bandwidth / IOPS is
at its limit.


On Mon, Jul 19, 2021 at 4:22 PM Adrian Bednarz 
wrote:

> Thanks Maciej, I think this has helped a bit. We are now at 2k/3k eps on a
> single node. Now, I just wonder if this isn't too slow for a single node
> and such a simple query.
>
> On Sat, Jul 10, 2021 at 9:28 AM Maciej Bryński  wrote:
>
>> Could you please set 2 configuration options:
>> - state.backend.rocksdb.predefined-options =
>> SPINNING_DISK_OPTIMIZED_HIGH_MEM
>> - state.backend.rocksdb.memory.partitioned-index-filters = true
>>
>> Regards,
>> Maciek
>>
>> sob., 10 lip 2021 o 08:54 Adrian Bednarz 
>> napisał(a):
>> >
>> > I didn’t tweak any RocksDB knobs. The only thing we did was to increase
>> managed memory to 12gb which was supposed to help RocksDB according to the
>> documentation. The rest stays at the defaults. Incremental checkpointing
>> was enabled as well but it made no difference in performance if we disabled
>> it.
>> >
>> > On Fri, 9 Jul 2021 at 20:43, Maciej Bryński  wrote:
>> >>
>> >> Hi Adrian,
>> >> Could you share your state backend configuration ?
>> >>
>> >> Regards,
>> >> Maciek
>> >>
>> >> pt., 9 lip 2021 o 19:09 Adrian Bednarz 
>> napisał(a):
>> >> >
>> >> > Hello,
>> >> >
>> >> > We are experimenting with lookup joins in Flink 1.13.0.
>> Unfortunately, we unexpectedly hit significant performance degradation when
>> changing the state backend to RocksDB.
>> >> >
>> >> > We performed tests with two tables: fact table TXN and dimension
>> table CUSTOMER with the following schemas:
>> >> >
>> >> > TXN:
>> >> >  |-- PROD_ID: BIGINT
>> >> >  |-- CUST_ID: BIGINT
>> >> >  |-- TYPE: BIGINT
>> >> >  |-- AMOUNT: BIGINT
>> >> >  |-- ITEMS: BIGINT
>> >> >  |-- TS: TIMESTAMP(3) **rowtime**
>> >> >  |-- WATERMARK FOR TS: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
>> >> >
>> >> > CUSTOMER:
>> >> >  |-- ID: BIGINT
>> >> >  |-- STATE: BIGINT
>> >> >  |-- AGE: BIGINT
>> >> >  |-- SCORE: DOUBLE
>> >> >  |-- PRIMARY KEY: ID
>> >> >
>> >> > And the following query:
>> >> > select state, sum(amount) from txn t JOIN customer FOR SYSTEM_TIME
>> AS OF t.ts ON t.cust_id = customer.id group by state, TUMBLE(t.ts,
>> INTERVAL '1' SECOND)
>> >> >
>> >> > In our catalog, we reconfigured the customer table so that the
>> watermark is set to infinity on that side of the join. We generate data in
>> a round robin fashion (except for timestamp that grows with a step of 1 ms).
>> >> >
>> >> > We performed our experiments on a single c5.4xlarge machine with
>> heap and managed memory size set to 12gb with a blackhole sink. With 2 000
>> 000 fact records and 100 000 dimension records, a job with heap backend
>> finishes in 5 seconds whereas RocksDB executes in 1h 24m. For 400 000
>> dimension records it doesn't grow significantly but goes up to 1h 36m (the
>> job processes more records after all).
>> >> >
>> >> > We also checked what would happen if we reduced the amount of
>> customer ids to 1. Our expectation was that RocksDB will not offload
>> anything to disk anymore so the performance should be comparable with heap
>> backend. It was executed in 10 minutes.
>> >> >
>> >> > Is this something anybody experienced or something to be expected?
>> Of course, we assumed RocksDB to perform slower but 300 eps is below our
>> expectations.
>> >> >
>> >> > Thanks,
>> >> > Adrian
>> >>
>> >>
>> >>
>> >> --
>> >> Maciek Bryński
>>
>>
>>
>> --
>> Maciek Bryński
>>
>


Re: Subpar performance of temporal joins with RocksDB backend

2021-07-19 Thread Adrian Bednarz
Thanks Maciej, I think this has helped a bit. We are now at 2k/3k eps on a
single node. Now, I just wonder if this isn't too slow for a single node
and such a simple query.

On Sat, Jul 10, 2021 at 9:28 AM Maciej Bryński  wrote:

> Could you please set 2 configuration options:
> - state.backend.rocksdb.predefined-options =
> SPINNING_DISK_OPTIMIZED_HIGH_MEM
> - state.backend.rocksdb.memory.partitioned-index-filters = true
>
> Regards,
> Maciek
>
> sob., 10 lip 2021 o 08:54 Adrian Bednarz 
> napisał(a):
> >
> > I didn’t tweak any RocksDB knobs. The only thing we did was to increase
> managed memory to 12gb which was supposed to help RocksDB according to the
> documentation. The rest stays at the defaults. Incremental checkpointing
> was enabled as well but it made no difference in performance if we disabled
> it.
> >
> > On Fri, 9 Jul 2021 at 20:43, Maciej Bryński  wrote:
> >>
> >> Hi Adrian,
> >> Could you share your state backend configuration ?
> >>
> >> Regards,
> >> Maciek
> >>
> >> pt., 9 lip 2021 o 19:09 Adrian Bednarz 
> napisał(a):
> >> >
> >> > Hello,
> >> >
> >> > We are experimenting with lookup joins in Flink 1.13.0.
> Unfortunately, we unexpectedly hit significant performance degradation when
> changing the state backend to RocksDB.
> >> >
> >> > We performed tests with two tables: fact table TXN and dimension
> table CUSTOMER with the following schemas:
> >> >
> >> > TXN:
> >> >  |-- PROD_ID: BIGINT
> >> >  |-- CUST_ID: BIGINT
> >> >  |-- TYPE: BIGINT
> >> >  |-- AMOUNT: BIGINT
> >> >  |-- ITEMS: BIGINT
> >> >  |-- TS: TIMESTAMP(3) **rowtime**
> >> >  |-- WATERMARK FOR TS: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
> >> >
> >> > CUSTOMER:
> >> >  |-- ID: BIGINT
> >> >  |-- STATE: BIGINT
> >> >  |-- AGE: BIGINT
> >> >  |-- SCORE: DOUBLE
> >> >  |-- PRIMARY KEY: ID
> >> >
> >> > And the following query:
> >> > select state, sum(amount) from txn t JOIN customer FOR SYSTEM_TIME AS
> OF t.ts ON t.cust_id = customer.id group by state, TUMBLE(t.ts, INTERVAL
> '1' SECOND)
> >> >
> >> > In our catalog, we reconfigured the customer table so that the
> watermark is set to infinity on that side of the join. We generate data in
> a round robin fashion (except for timestamp that grows with a step of 1 ms).
> >> >
> >> > We performed our experiments on a single c5.4xlarge machine with heap
> and managed memory size set to 12gb with a blackhole sink. With 2 000 000
> fact records and 100 000 dimension records, a job with heap backend
> finishes in 5 seconds whereas RocksDB executes in 1h 24m. For 400 000
> dimension records it doesn't grow significantly but goes up to 1h 36m (the
> job processes more records after all).
> >> >
> >> > We also checked what would happen if we reduced the amount of
> customer ids to 1. Our expectation was that RocksDB will not offload
> anything to disk anymore so the performance should be comparable with heap
> backend. It was executed in 10 minutes.
> >> >
> >> > Is this something anybody experienced or something to be expected? Of
> course, we assumed RocksDB to perform slower but 300 eps is below our
> expectations.
> >> >
> >> > Thanks,
> >> > Adrian
> >>
> >>
> >>
> >> --
> >> Maciek Bryński
>
>
>
> --
> Maciek Bryński
>


Re: Subpar performance of temporal joins with RocksDB backend

2021-07-10 Thread Maciej Bryński
Could you please set 2 configuration options:
- state.backend.rocksdb.predefined-options = SPINNING_DISK_OPTIMIZED_HIGH_MEM
- state.backend.rocksdb.memory.partitioned-index-filters = true

Regards,
Maciek

sob., 10 lip 2021 o 08:54 Adrian Bednarz  napisał(a):
>
> I didn’t tweak any RocksDB knobs. The only thing we did was to increase 
> managed memory to 12gb which was supposed to help RocksDB according to the 
> documentation. The rest stays at the defaults. Incremental checkpointing was 
> enabled as well but it made no difference in performance if we disabled it.
>
> On Fri, 9 Jul 2021 at 20:43, Maciej Bryński  wrote:
>>
>> Hi Adrian,
>> Could you share your state backend configuration ?
>>
>> Regards,
>> Maciek
>>
>> pt., 9 lip 2021 o 19:09 Adrian Bednarz  napisał(a):
>> >
>> > Hello,
>> >
>> > We are experimenting with lookup joins in Flink 1.13.0. Unfortunately, we 
>> > unexpectedly hit significant performance degradation when changing the 
>> > state backend to RocksDB.
>> >
>> > We performed tests with two tables: fact table TXN and dimension table 
>> > CUSTOMER with the following schemas:
>> >
>> > TXN:
>> >  |-- PROD_ID: BIGINT
>> >  |-- CUST_ID: BIGINT
>> >  |-- TYPE: BIGINT
>> >  |-- AMOUNT: BIGINT
>> >  |-- ITEMS: BIGINT
>> >  |-- TS: TIMESTAMP(3) **rowtime**
>> >  |-- WATERMARK FOR TS: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
>> >
>> > CUSTOMER:
>> >  |-- ID: BIGINT
>> >  |-- STATE: BIGINT
>> >  |-- AGE: BIGINT
>> >  |-- SCORE: DOUBLE
>> >  |-- PRIMARY KEY: ID
>> >
>> > And the following query:
>> > select state, sum(amount) from txn t JOIN customer FOR SYSTEM_TIME AS OF 
>> > t.ts ON t.cust_id = customer.id group by state, TUMBLE(t.ts, INTERVAL '1' 
>> > SECOND)
>> >
>> > In our catalog, we reconfigured the customer table so that the watermark 
>> > is set to infinity on that side of the join. We generate data in a round 
>> > robin fashion (except for timestamp that grows with a step of 1 ms).
>> >
>> > We performed our experiments on a single c5.4xlarge machine with heap and 
>> > managed memory size set to 12gb with a blackhole sink. With 2 000 000 fact 
>> > records and 100 000 dimension records, a job with heap backend finishes in 
>> > 5 seconds whereas RocksDB executes in 1h 24m. For 400 000 dimension 
>> > records it doesn't grow significantly but goes up to 1h 36m (the job 
>> > processes more records after all).
>> >
>> > We also checked what would happen if we reduced the amount of customer ids 
>> > to 1. Our expectation was that RocksDB will not offload anything to disk 
>> > anymore so the performance should be comparable with heap backend. It was 
>> > executed in 10 minutes.
>> >
>> > Is this something anybody experienced or something to be expected? Of 
>> > course, we assumed RocksDB to perform slower but 300 eps is below our 
>> > expectations.
>> >
>> > Thanks,
>> > Adrian
>>
>>
>>
>> --
>> Maciek Bryński



-- 
Maciek Bryński


Re: Subpar performance of temporal joins with RocksDB backend

2021-07-10 Thread Adrian Bednarz
I didn’t tweak any RocksDB knobs. The only thing we did was to increase
managed memory to 12gb which was supposed to help RocksDB according to the
documentation. The rest stays at the defaults. Incremental checkpointing
was enabled as well but it made no difference in performance if we disabled
it.

On Fri, 9 Jul 2021 at 20:43, Maciej Bryński  wrote:

> Hi Adrian,
> Could you share your state backend configuration ?
>
> Regards,
> Maciek
>
> pt., 9 lip 2021 o 19:09 Adrian Bednarz 
> napisał(a):
> >
> > Hello,
> >
> > We are experimenting with lookup joins in Flink 1.13.0. Unfortunately,
> we unexpectedly hit significant performance degradation when changing the
> state backend to RocksDB.
> >
> > We performed tests with two tables: fact table TXN and dimension table
> CUSTOMER with the following schemas:
> >
> > TXN:
> >  |-- PROD_ID: BIGINT
> >  |-- CUST_ID: BIGINT
> >  |-- TYPE: BIGINT
> >  |-- AMOUNT: BIGINT
> >  |-- ITEMS: BIGINT
> >  |-- TS: TIMESTAMP(3) **rowtime**
> >  |-- WATERMARK FOR TS: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
> >
> > CUSTOMER:
> >  |-- ID: BIGINT
> >  |-- STATE: BIGINT
> >  |-- AGE: BIGINT
> >  |-- SCORE: DOUBLE
> >  |-- PRIMARY KEY: ID
> >
> > And the following query:
> > select state, sum(amount) from txn t JOIN customer FOR SYSTEM_TIME AS OF
> t.ts ON t.cust_id = customer.id group by state, TUMBLE(t.ts, INTERVAL '1'
> SECOND)
> >
> > In our catalog, we reconfigured the customer table so that the watermark
> is set to infinity on that side of the join. We generate data in a round
> robin fashion (except for timestamp that grows with a step of 1 ms).
> >
> > We performed our experiments on a single c5.4xlarge machine with heap
> and managed memory size set to 12gb with a blackhole sink. With 2 000 000
> fact records and 100 000 dimension records, a job with heap backend
> finishes in 5 seconds whereas RocksDB executes in 1h 24m. For 400 000
> dimension records it doesn't grow significantly but goes up to 1h 36m (the
> job processes more records after all).
> >
> > We also checked what would happen if we reduced the amount of customer
> ids to 1. Our expectation was that RocksDB will not offload anything to
> disk anymore so the performance should be comparable with heap backend. It
> was executed in 10 minutes.
> >
> > Is this something anybody experienced or something to be expected? Of
> course, we assumed RocksDB to perform slower but 300 eps is below our
> expectations.
> >
> > Thanks,
> > Adrian
>
>
>
> --
> Maciek Bryński
>


Re: Subpar performance of temporal joins with RocksDB backend

2021-07-09 Thread Maciej Bryński
Hi Adrian,
Could you share your state backend configuration ?

Regards,
Maciek

pt., 9 lip 2021 o 19:09 Adrian Bednarz  napisał(a):
>
> Hello,
>
> We are experimenting with lookup joins in Flink 1.13.0. Unfortunately, we 
> unexpectedly hit significant performance degradation when changing the state 
> backend to RocksDB.
>
> We performed tests with two tables: fact table TXN and dimension table 
> CUSTOMER with the following schemas:
>
> TXN:
>  |-- PROD_ID: BIGINT
>  |-- CUST_ID: BIGINT
>  |-- TYPE: BIGINT
>  |-- AMOUNT: BIGINT
>  |-- ITEMS: BIGINT
>  |-- TS: TIMESTAMP(3) **rowtime**
>  |-- WATERMARK FOR TS: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
>
> CUSTOMER:
>  |-- ID: BIGINT
>  |-- STATE: BIGINT
>  |-- AGE: BIGINT
>  |-- SCORE: DOUBLE
>  |-- PRIMARY KEY: ID
>
> And the following query:
> select state, sum(amount) from txn t JOIN customer FOR SYSTEM_TIME AS OF t.ts 
> ON t.cust_id = customer.id group by state, TUMBLE(t.ts, INTERVAL '1' SECOND)
>
> In our catalog, we reconfigured the customer table so that the watermark is 
> set to infinity on that side of the join. We generate data in a round robin 
> fashion (except for timestamp that grows with a step of 1 ms).
>
> We performed our experiments on a single c5.4xlarge machine with heap and 
> managed memory size set to 12gb with a blackhole sink. With 2 000 000 fact 
> records and 100 000 dimension records, a job with heap backend finishes in 5 
> seconds whereas RocksDB executes in 1h 24m. For 400 000 dimension records it 
> doesn't grow significantly but goes up to 1h 36m (the job processes more 
> records after all).
>
> We also checked what would happen if we reduced the amount of customer ids to 
> 1. Our expectation was that RocksDB will not offload anything to disk anymore 
> so the performance should be comparable with heap backend. It was executed in 
> 10 minutes.
>
> Is this something anybody experienced or something to be expected? Of course, 
> we assumed RocksDB to perform slower but 300 eps is below our expectations.
>
> Thanks,
> Adrian



-- 
Maciek Bryński