Thanks Junhao.

I noticed that you have created a PIP [1]. Could you update your PIP refer
to PIP Template [2]? This can help us understand your proposal better.

[1]
https://cwiki.apache.org/confluence/display/PAIMON/PIP-6%3A+Batch+Table+Without+Bucket?src=contextnavpagetreemode
[2] https://cwiki.apache.org/confluence/display/PAIMON/PIP+Template

Best,
Shammon FY

On Mon, May 22, 2023 at 9:14 PM Jingsong Li <[email protected]> wrote:

> Thanks Junhao for your proposal,
>
> Can you send a formal format PIP discussion email? Just like
> https://lists.apache.org/thread/dyq1jyrj8cqb26z84bhqr7xx1pn7ctj8
>
> And the content in the PIP can be optimized to formal format too.
>
> Best,
> Jingsong
>
> On Mon, May 22, 2023 at 4:49 PM JUNHAO YE <[email protected]> wrote:
>
> > Hi, Guys.
> > I want to start a conversation about improving paimon batch processing.
> > see
> >
> https://cwiki.apache.org/confluence/display/PAIMON/PIP-6%3A+Batch+Table+Without+Bucket
> >
> > *PREFACE:*
> >
> > Currently, paimon has very high support for stream write and stream read,
> > but not enough for traditional batch processing. After the table is
> > created, you need to display the specified bucket key and bucket number;
> > otherwise, the AppendOnly table or changelog table for a single bucket is
> > created. When there is only one bucket, concurrent read/write and compact
> > cannot be performed, resulting in poor batch performance.
> > In a traditional offline bin, users don't care about buckets and don't
> > want to care about buckets. In the case of batch off-line computing, the
> > bucket concept created by paimon for real-time stream reading and writing
> > does not apply. In the case of non-stream read and write, the guaranteed
> > record order based on sequence.field does not make much sense. Because
> > without streaming retract, only concurrent insert data flows into the
> > table. The rest of the processing is done in batch mode.
> >
> > Therefore, it is necessary to design a table, which needs to meet the
> > following characteristics:
> > 1. There is no concept of primary key and bucket key, which is equivalent
> > to the offline table in the offline bucket.
> > 2. Only inserting data in stream mode is supported.(Only “I” type data)
> As
> > an offline table, data can be inserted synchronously in real time, but
> > there is no need for real-time delete, update, etc.
> > 3. streaming write, support concurrent write, concurrent compact.
> > 4. update data and delete data through batch tasks.
> > 5. Write first read order is not guaranteed.
> >
> >
> > *SCENARIO:*
> >
> > The customer has a large amount of order transaction data and expects it
> > to automatically flow into the offline table every time the transaction
> > data is available, and batch task statistics will be performed at 12
> > o'clock every day
> > Combined with the flink computing engine, we can create tables when only
> > offline batch tables are needed:
> >
> > *CREATE TABLE Orders (
> >     order_id        INT,
> >     order_type      STRING,
> >     `date`          TIMESTAMP,
> >     price           INT,
> >     number          INT
> > ) PARTITIONED BY (order_type)
> > WITH (
> > 'write-mode' = 'table'
> > );*
> >
> >
> > There isn’t any property about the bucket.
> >
> > We want this table to be used for olap offline analysis, such as once a
> day statistics. But its data volume and traffic volume are large, so we
> hope it can update by itself:
> >
> > *INSERT INTO Orders
> >       SELECT * FROM OrderSource;*
> >
> >
> > Conduct off-line analysis and query to calculate the total sales in one
> day:
> >
> > *SELECT sum(price * number) FROM Orders GROUP BY DATE_FORMAT(`date`,
> 'yyyyMMdd’);*
> >
> >
> > Statistical order type transaction quantity:
> >
> > *SELECT order_type, sum(*) FROM Orders GROUP BY order_type;*
> >
> >
> > The order type has changed, and all historical data needs to be changed
> (flink-version >= 1.17) :
> >
> > *UPDATE Orders SET order_type = 'banana' WHERE order_type = 'apple' AND
> `date` > TO_TIMESTAMP('2020-02-02', 'yyyy-HH-dd’);*
> >
> >
> > *DESIGN:*
> >
> > At the paimon project level, we need a new table, a new write mode. In
> this mode:
> >
> > 1. +I data only, no +U -U -D data types.
> >
> > 2. All data goes to the default bucket=0 file directory (for consistency
> with the previous design).
> >
> > 3. No sequence.number, data sequence reading and writing is not required
> >
> > 4. Separate compaction from writer, and writer is no longer responsible
> for compact at the same time. This is for Solving the problem of compact
> when single bucket is written concurrently (default write-only=true when
> building table)
> >
> > 5. Create new members: CompactionCoordinator and CompactionWorker.
> CompactionCoordinator is a single concurrent coordinator that receives data
> written by upstream writers. CompactionWorker is a multi-concurrent
> compaction executor that runs only the compaction task specified by the
> coordinator.
> >
> >
> > At the computing engine level, we build the following topology when
> writing in real time:
> >
> > [image: image.png]
> >
> >
> > [image: 粘贴的图形-2.tiff]
> >
> > 1. In the prepareSnapshot phase, the writer flushes the new file, the
> compaction coordinator receives the new file, and the compaction
> coordinator reads the last delta file from the latest snapshot and adds it
> to the restored files.
> >
> >    Also, depending on a strategy, decide whether to create a compaction.
> >
> >
> > 2. A compaction coordinator delivers a compaction task. Every compaction
> worker executes tasks based on any compaction that occurs and any new file
> submitted by the writer.
> >
> >    Build the commit message again. Pass commit message to downstream
> committer after execution.
> >
> >
> > 3. The snapshot stage saves the status information of each operator.
> >
> >
> > 4. During the notify cp complete phase, the committer submits the file
> information that compacts. Generate a new snapshot. When a compaction
> coordinator next prepares snapshot,
> >
> >    it reads the snapshot delta (and updates its restored files based on
> the saved snapshotid).
> >
> >
> >
> > Use batch mode for scenarios such as delete and update. RowType such as
> +U -U -D is not used while performing deleting and updating operations.
> Real file replacement is used for deleting and updating.
> >
> > After each delete update operation, a new snapshot is generated.
> >
> >
> >
> > Please write back about your opinions.
> >
> >
> >
> > Best,
> >
> > Junhao.
> >
> >
>

Reply via email to