Thanks Junhao. I noticed that you have created a PIP [1]. Could you update your PIP refer to PIP Template [2]? This can help us understand your proposal better.
[1] https://cwiki.apache.org/confluence/display/PAIMON/PIP-6%3A+Batch+Table+Without+Bucket?src=contextnavpagetreemode [2] https://cwiki.apache.org/confluence/display/PAIMON/PIP+Template Best, Shammon FY On Mon, May 22, 2023 at 9:14 PM Jingsong Li <[email protected]> wrote: > Thanks Junhao for your proposal, > > Can you send a formal format PIP discussion email? Just like > https://lists.apache.org/thread/dyq1jyrj8cqb26z84bhqr7xx1pn7ctj8 > > And the content in the PIP can be optimized to formal format too. > > Best, > Jingsong > > On Mon, May 22, 2023 at 4:49 PM JUNHAO YE <[email protected]> wrote: > > > Hi, Guys. > > I want to start a conversation about improving paimon batch processing. > > see > > > https://cwiki.apache.org/confluence/display/PAIMON/PIP-6%3A+Batch+Table+Without+Bucket > > > > *PREFACE:* > > > > Currently, paimon has very high support for stream write and stream read, > > but not enough for traditional batch processing. After the table is > > created, you need to display the specified bucket key and bucket number; > > otherwise, the AppendOnly table or changelog table for a single bucket is > > created. When there is only one bucket, concurrent read/write and compact > > cannot be performed, resulting in poor batch performance. > > In a traditional offline bin, users don't care about buckets and don't > > want to care about buckets. In the case of batch off-line computing, the > > bucket concept created by paimon for real-time stream reading and writing > > does not apply. In the case of non-stream read and write, the guaranteed > > record order based on sequence.field does not make much sense. Because > > without streaming retract, only concurrent insert data flows into the > > table. The rest of the processing is done in batch mode. > > > > Therefore, it is necessary to design a table, which needs to meet the > > following characteristics: > > 1. There is no concept of primary key and bucket key, which is equivalent > > to the offline table in the offline bucket. > > 2. Only inserting data in stream mode is supported.(Only “I” type data) > As > > an offline table, data can be inserted synchronously in real time, but > > there is no need for real-time delete, update, etc. > > 3. streaming write, support concurrent write, concurrent compact. > > 4. update data and delete data through batch tasks. > > 5. Write first read order is not guaranteed. > > > > > > *SCENARIO:* > > > > The customer has a large amount of order transaction data and expects it > > to automatically flow into the offline table every time the transaction > > data is available, and batch task statistics will be performed at 12 > > o'clock every day > > Combined with the flink computing engine, we can create tables when only > > offline batch tables are needed: > > > > *CREATE TABLE Orders ( > > order_id INT, > > order_type STRING, > > `date` TIMESTAMP, > > price INT, > > number INT > > ) PARTITIONED BY (order_type) > > WITH ( > > 'write-mode' = 'table' > > );* > > > > > > There isn’t any property about the bucket. > > > > We want this table to be used for olap offline analysis, such as once a > day statistics. But its data volume and traffic volume are large, so we > hope it can update by itself: > > > > *INSERT INTO Orders > > SELECT * FROM OrderSource;* > > > > > > Conduct off-line analysis and query to calculate the total sales in one > day: > > > > *SELECT sum(price * number) FROM Orders GROUP BY DATE_FORMAT(`date`, > 'yyyyMMdd’);* > > > > > > Statistical order type transaction quantity: > > > > *SELECT order_type, sum(*) FROM Orders GROUP BY order_type;* > > > > > > The order type has changed, and all historical data needs to be changed > (flink-version >= 1.17) : > > > > *UPDATE Orders SET order_type = 'banana' WHERE order_type = 'apple' AND > `date` > TO_TIMESTAMP('2020-02-02', 'yyyy-HH-dd’);* > > > > > > *DESIGN:* > > > > At the paimon project level, we need a new table, a new write mode. In > this mode: > > > > 1. +I data only, no +U -U -D data types. > > > > 2. All data goes to the default bucket=0 file directory (for consistency > with the previous design). > > > > 3. No sequence.number, data sequence reading and writing is not required > > > > 4. Separate compaction from writer, and writer is no longer responsible > for compact at the same time. This is for Solving the problem of compact > when single bucket is written concurrently (default write-only=true when > building table) > > > > 5. Create new members: CompactionCoordinator and CompactionWorker. > CompactionCoordinator is a single concurrent coordinator that receives data > written by upstream writers. CompactionWorker is a multi-concurrent > compaction executor that runs only the compaction task specified by the > coordinator. > > > > > > At the computing engine level, we build the following topology when > writing in real time: > > > > [image: image.png] > > > > > > [image: 粘贴的图形-2.tiff] > > > > 1. In the prepareSnapshot phase, the writer flushes the new file, the > compaction coordinator receives the new file, and the compaction > coordinator reads the last delta file from the latest snapshot and adds it > to the restored files. > > > > Also, depending on a strategy, decide whether to create a compaction. > > > > > > 2. A compaction coordinator delivers a compaction task. Every compaction > worker executes tasks based on any compaction that occurs and any new file > submitted by the writer. > > > > Build the commit message again. Pass commit message to downstream > committer after execution. > > > > > > 3. The snapshot stage saves the status information of each operator. > > > > > > 4. During the notify cp complete phase, the committer submits the file > information that compacts. Generate a new snapshot. When a compaction > coordinator next prepares snapshot, > > > > it reads the snapshot delta (and updates its restored files based on > the saved snapshotid). > > > > > > > > Use batch mode for scenarios such as delete and update. RowType such as > +U -U -D is not used while performing deleting and updating operations. > Real file replacement is used for deleting and updating. > > > > After each delete update operation, a new snapshot is generated. > > > > > > > > Please write back about your opinions. > > > > > > > > Best, > > > > Junhao. > > > > >
