Hi!

Changes of input tables will cause corresponding changes in output table


Which sink are you using? If it is an upsert sink then Flink SQL planner
will filter out UPDATE_BEFORE messages automatically. Also if your sink
supports something like "ignore delete messages" it can also filter out
delete messages and affect the downstream less.

Mini-batch will also help in this case. If mini-batch is enabled,
aggregations will only send updates to the downstream once per batch, thus
decreasing the number of records flowing to downstream.

For better performance on aggregations you can also try local-global
aggregations. See [1] for details.

Row-Based Storage


This depends on the format you use. Although Flink's current calculation
model is row-based, it still supports column-based format like parquet and
has a number of optimizations on it. If you enable mini-batch and
two-staged aggregations most job will meet their performance needs.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#local-global-aggregation

vtygoss <vtyg...@126.com> 于2021年12月13日周一 17:13写道:

> Hi, community!
>
>
> I meet a problem in the procedure of building a streaming production
> pipeline using Flink retract stream and hudi-hdfs/kafka as storage engine
> and rocksdb as statebackend.
>
>
> In my scenario,
>
> - During a patient's hospitalization, multiple measurements of vital signs
> are recorded, including temperature, pulse, blood pressure and so on.
>
> - Each type of vital sign contains 20+ or more records with PRIMARY
> KEY(patientId, visitId, signType, time) in table tbl_vis_vital_signs
> mentioned in below code.
>
>
> And, i need to get all the vital sign records aggregations together
> through JOIN or COLLECT with FILTER, as code below.
>
>
> ```
>
> select pid, vid,
>
> collect(ROW(..., temperature,...)) filter(where signType='temprature') as
> temprature,
>
> collect(ROW(..., pulse,..))filter(where signType='pulse') as pulse,
>
> collect(....) filter(where ...) as bloodpressure
>
> from tbl_vis_vital_signs
>
> group by pid, vid
>
> ```
>
>
> With the help of FlinkCDC and Kafka/Hudi-Hdfs, we want to build streaming
> production pipeline, as the data flow below.
>
>
> DataBase    --[CDC tools]-->   Kafka     --[sync]-->     Dynamic
> Table(kafka/hudi-hdfs)  --Flink SQL(retract stream) --> Dynamic Table
>
>
> The problem is contributed by three factors as following.
>
> 1. Data Inflations:
>
> 1) major: Changes of input tables will cause corresponding changes in
> output table, e.g. join, aggregation. In the code above, every change of
> each row in tbl_vis_vital_signs will retract the out-dated result full of
> all vital signs' info and send new result. More serious, there are many
> vital sign records during per hospitalization, and cause too many times of
> retract and re-send operations which will be consumed by all downstreams.
>
> 2) minor: Each cdc update event will be split in two event: deletion of
> old record and insertion of new record.
>
> 2. Kafka / Hudi-HDFS / RocksDB Append incremental data to full data:
>
> 1) RocksDB and Hudi-HDFS use incremental model like LSM, they append
> incremental events to full, no matter insertion or deletion.
>
> 2) Even upsert-kafka, is implemented by inserting tombstones.
>
> 3. Row-Based Storage
>
>
> In my scenario, these factors will cause problems:
>
> 1. A large number of low meaning intermediate results of PrimaryKey
> consume throughput of Flink Application.
>
> 2. Heavy checkpoint: In every checkpoint(aligned, every 10 sec),
> the incremental block data of rocksdb is over a few of GB, and it takes
> over a few minutes if succussfully. But only a few GB data exists in HDFS
> checkpoint directory.
>
> 3. Low performance of application and low stablity of TaskManager JVM.
>
>
> So, does mini-batch have an improvement of this scenario?
>
> Thanks for your any reply or suggestions.
>
>
> Best Regards!
>
>
> 2021-12-13 17:10:00
>
>
>
>
>

Reply via email to