Thanks Junhao for your proposal,

Can you send a formal format PIP discussion email? Just like
https://lists.apache.org/thread/dyq1jyrj8cqb26z84bhqr7xx1pn7ctj8

And the content in the PIP can be optimized to formal format too.

Best,
Jingsong

On Mon, May 22, 2023 at 4:49 PM JUNHAO YE <[email protected]> wrote:

> Hi, Guys.
> I want to start a conversation about improving paimon batch processing.
> see
> https://cwiki.apache.org/confluence/display/PAIMON/PIP-6%3A+Batch+Table+Without+Bucket
>
> *PREFACE:*
>
> Currently, paimon has very high support for stream write and stream read,
> but not enough for traditional batch processing. After the table is
> created, you need to display the specified bucket key and bucket number;
> otherwise, the AppendOnly table or changelog table for a single bucket is
> created. When there is only one bucket, concurrent read/write and compact
> cannot be performed, resulting in poor batch performance.
> In a traditional offline bin, users don't care about buckets and don't
> want to care about buckets. In the case of batch off-line computing, the
> bucket concept created by paimon for real-time stream reading and writing
> does not apply. In the case of non-stream read and write, the guaranteed
> record order based on sequence.field does not make much sense. Because
> without streaming retract, only concurrent insert data flows into the
> table. The rest of the processing is done in batch mode.
>
> Therefore, it is necessary to design a table, which needs to meet the
> following characteristics:
> 1. There is no concept of primary key and bucket key, which is equivalent
> to the offline table in the offline bucket.
> 2. Only inserting data in stream mode is supported.(Only “I” type data) As
> an offline table, data can be inserted synchronously in real time, but
> there is no need for real-time delete, update, etc.
> 3. streaming write, support concurrent write, concurrent compact.
> 4. update data and delete data through batch tasks.
> 5. Write first read order is not guaranteed.
>
>
> *SCENARIO:*
>
> The customer has a large amount of order transaction data and expects it
> to automatically flow into the offline table every time the transaction
> data is available, and batch task statistics will be performed at 12
> o'clock every day
> Combined with the flink computing engine, we can create tables when only
> offline batch tables are needed:
>
> *CREATE TABLE Orders (
>     order_id        INT,
>     order_type      STRING,
>     `date`          TIMESTAMP,
>     price           INT,
>     number          INT
> ) PARTITIONED BY (order_type)
> WITH (
> 'write-mode' = 'table'
> );*
>
>
> There isn’t any property about the bucket.
>
> We want this table to be used for olap offline analysis, such as once a day 
> statistics. But its data volume and traffic volume are large, so we hope it 
> can update by itself:
>
> *INSERT INTO Orders
>       SELECT * FROM OrderSource;*
>
>
> Conduct off-line analysis and query to calculate the total sales in one day:
>
> *SELECT sum(price * number) FROM Orders GROUP BY DATE_FORMAT(`date`, 
> 'yyyyMMdd’);*
>
>
> Statistical order type transaction quantity:
>
> *SELECT order_type, sum(*) FROM Orders GROUP BY order_type;*
>
>
> The order type has changed, and all historical data needs to be changed 
> (flink-version >= 1.17) :
>
> *UPDATE Orders SET order_type = 'banana' WHERE order_type = 'apple' AND 
> `date` > TO_TIMESTAMP('2020-02-02', 'yyyy-HH-dd’);*
>
>
> *DESIGN:*
>
> At the paimon project level, we need a new table, a new write mode. In this 
> mode:
>
> 1. +I data only, no +U -U -D data types.
>
> 2. All data goes to the default bucket=0 file directory (for consistency with 
> the previous design).
>
> 3. No sequence.number, data sequence reading and writing is not required
>
> 4. Separate compaction from writer, and writer is no longer responsible for 
> compact at the same time. This is for Solving the problem of compact when 
> single bucket is written concurrently (default write-only=true when building 
> table)
>
> 5. Create new members: CompactionCoordinator and CompactionWorker. 
> CompactionCoordinator is a single concurrent coordinator that receives data 
> written by upstream writers. CompactionWorker is a multi-concurrent 
> compaction executor that runs only the compaction task specified by the 
> coordinator.
>
>
> At the computing engine level, we build the following topology when writing 
> in real time:
>
> [image: image.png]
>
>
> [image: 粘贴的图形-2.tiff]
>
> 1. In the prepareSnapshot phase, the writer flushes the new file, the 
> compaction coordinator receives the new file, and the compaction coordinator 
> reads the last delta file from the latest snapshot and adds it to the 
> restored files.
>
>    Also, depending on a strategy, decide whether to create a compaction.
>
>
> 2. A compaction coordinator delivers a compaction task. Every compaction 
> worker executes tasks based on any compaction that occurs and any new file 
> submitted by the writer.
>
>    Build the commit message again. Pass commit message to downstream 
> committer after execution.
>
>
> 3. The snapshot stage saves the status information of each operator.
>
>
> 4. During the notify cp complete phase, the committer submits the file 
> information that compacts. Generate a new snapshot. When a compaction 
> coordinator next prepares snapshot,
>
>    it reads the snapshot delta (and updates its restored files based on the 
> saved snapshotid).
>
>
>
> Use batch mode for scenarios such as delete and update. RowType such as +U -U 
> -D is not used while performing deleting and updating operations. Real file 
> replacement is used for deleting and updating.
>
> After each delete update operation, a new snapshot is generated.
>
>
>
> Please write back about your opinions.
>
>
>
> Best,
>
> Junhao.
>
>

Reply via email to