Hi devs,
With no more discussions being opened in thread [1], I'd like to start a vote on PIP-21: Introduce Range Partition And Sort in Append Scalable Table Batch Writing for Flink [2]. The vote will be open for at least 72 hours unless there is an objection or insufficient votes. Best regards, Wencong [1] https://lists.apache.org/thread/pyhnjcq259897sb84fprk1362b1dwcsb [2] https://cwiki.apache.org/confluence/display/PAIMON/PIP-21%3A+Introduce+Range+Partition+And+Sort+in+Append+Scalable+Table+Batch+Writing+for+Flink At 2024-05-09 09:43:39, "Jingsong Li" <[email protected]> wrote: >Thanks Wencong for updating. > >+1 > >Best, >Jingsong > >On Wed, May 8, 2024 at 11:02 PM Wencong Liu <[email protected]> wrote: >> >> Thank you for the responses from Xintong and Jinsong. >> >> >> >> >> To Xintong, >> >> > Q1: >> >> The naming conventions for identifiers in Flink SQL are consistent >> >> with the SQL standard, meaning identifiers cannot contain spaces or >> >> commas, so using commas as separators is not a problem. >> >> >> >> >> When this configuration item is not specified or is specified as an empty >> string, >> >> it means that the range partition write feature is not enabled. >> >> I have added the related explanation in the description of the configuration >> item. >> >> >> >> >> > Q2: >> >> Following our offline discussions with Jinsong, we all agree that it is >> >> necessary to clearly state the specific judgment rules corresponding to >> >> the auto mode in the description of the 'sink.clustering.strategy' >> >> configuration item. >> >> >> >> >> > Q3: >> >> In the current implementation, the total sample size equals the number >> >> of range partitions times 100, which aligns with Spark's implementation. >> Spark >> >> allows controlling the number of samples per range partition through the >> >> "spark.sql.execution.rangeExchange.sampleSizePerPartition" parameter, with a >> >> default of 100. Similarly, we can provide a >> 'sink.clustering.sample-size-in-cluster' >> >> parameter for control. >> >> >> >> >> To Jinsong, >> >> I agree with your option. Renaming the 'sink.clustering.sort-partition' >> >> configuration item to 'sink.clustering.sort-in-cluster' makes it >> >> easier to understand. The default value should be `true`, and the >> >> relevant description has been updated in the PIP. >> >> >> >> >> Best, >> >> Wencong >> >> >> >> >> >> At 2024-05-06 14:57:05, "Jingsong Li" <[email protected]> wrote: >> >Hi Wencong, >> > >> >I misunderstood the meaning of sort-partition, it is very confusing to >> >table partition. >> > >> >I suggest we can change 'sink.clustering.sort-partition' to >> >'sink.clustering.local-sort'. >> > >> >And I think the default value should be true. Compared to distributed >> >range sort, local sorting is a low-cost behavior and we should >> >complete it by default. >> > >> >Best, >> >Jingsong >> > >> >On Mon, Apr 29, 2024 at 9:42 PM Jingsong Li <[email protected]> wrote: >> >> >> >> I second Xintong’s suggestion, we can just let the default value is >> >> order, auto is too early for us now, you can take a look to other systems. >> >> >> >> And for sink.clustering.sort-partition: >> >> Indicates whether to further sort each partition after range >> >> partitioning, enhancing data orderliness within each partition. >> >> >> >> Maybe adding partition fields to range sort is better? We already have >> >> spill mechanism to avoid OOM in writing. This looks not so useful. But, >> >> range sort to partition fields is useful. Can reduce small files. >> >> >> >> Xintong Song <[email protected]>于2024年4月29日 周一15:26写道: >> >>> >> >>> +1 for the proposal in general. The feature should significantly improve >> >>> the performance that downstream workloads read data from the tables. >> >>> >> >>> I have a few suggestions / questions. >> >>> >> >>> 1. For `sink.clustering.by-columns`, I think it would be nice to >> >>> explicitly >> >>> mention that not specified (or null) means the feature is not enabled. >> >>> >> >>> 2. For `sink.clustering.strategy`, I'd suggest not to expose the >> >>> behaviors >> >>> when the value is `auto` to users. For this developer-oriented PIP >> >>> document, it's important to make the behavior clear so that people can >> >>> vote >> >>> on it. But for the user-oriented configuration description, `auto` would >> >>> simply mean the system would automatically choose a strategy and users >> >>> don't need to worry about it. Moreover, not exposing the behavior would >> >>> give us the chance to change it in future if necessary, without breaking >> >>> any commitment that we made to users. >> >>> >> >>> 3. I'd like to understand a bit more about the sampling strategy. In >> >>> particular, how much data is sampled out of the entire data set? Is this >> >>> decided by a certain sampling rate, or is the amount of samples fixed >> >>> regardless of the size of the data set? Should the rate / amount be >> >>> configurable, or any practices suggest that a hard-coded parameter works >> >>> fine in most use cases? >> >>> >> >>> Best, >> >>> >> >>> Xintong >> >>> >> >>> >> >>> >> >>> On Tue, Apr 23, 2024 at 10:59 PM Wencong Liu <[email protected]> >> >>> wrote: >> >>> >> >>> > Thanks for your reply. >> >>> > 1.Yes. The LocalSample will receive data emitted by the >> >>> > Upstream Operator and perform sampling. The >> >>> > specific sampling algorithm used is reservoir sampling [1]. >> >>> > 2. Assign Range Index will wait until all records have >> >>> > been consumed by Local Sample and the result >> >>> > is generated by Global Sample. >> >>> > >> >>> > [1] https://arxiv.org/pdf/1903.12065v1.pdf >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > At 2024-04-23 20:48:45, "wj wang" <[email protected]> wrote: >> >>> > >Hi,Wencong >> >>> > >I have two small questions. >> >>> > >1. Add record will be emitted from `Upstream Operator` to `Local >> >>> > >Sample`? If not, what is the sample rule? >> >>> > >2. From pip, I infer that the record in `Assign Range Index` should >> >>> > >wait for the broadcast result from `Global Sample`,So How long do they >> >>> > >wait? Until all records have been consumed by `Local Sample` or not? >> >>> > > >> >>> > >Best, >> >>> > >wangwj >> >>> > > >> >>> > >On Mon, Apr 22, 2024 at 6:20 PM Jingsong Li <[email protected]> >> >>> > wrote: >> >>> > >> >> >>> > >> +1 for your proposal. >> >>> > >> >> >>> > >> You can add to the description. >> >>> > >> >> >>> > >> Best, >> >>> > >> Jingsong >> >>> > >> >> >>> > >> On Mon, Apr 22, 2024 at 6:15 PM Wencong Liu <[email protected]> >> >>> > wrote: >> >>> > >> > >> >>> > >> > Hi Jinsong, >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > This topic requires discussion, hence it wasn't directly >> >>> > >> > addressed in >> >>> > the PIP. >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > I believe the type of sorting algorithm to use depends on the >> >>> > >> > number >> >>> > of >> >>> > >> > fields specified by the user for comparison. When only one >> >>> > >> > comparison >> >>> > field is >> >>> > >> > specified, it's best to use basic data types for direct comparison >> >>> > for the most accurate >> >>> > >> > results. For multiple comparison fields, both the Z-order curve >> >>> > >> > and >> >>> > Hilbert curve algorithms >> >>> > >> > are suitable. In such cases, data maintains a certain level of >> >>> > >> > order >> >>> > in any comparison >> >>> > >> > field. Generally, the computation cost of the Z-order curve >> >>> > >> > algorithm >> >>> > is lower >> >>> > >> > than that of the Hilbert curve algorithm. However, in >> >>> > >> > high-dimensional >> >>> > >> > scenarios, the Hilbert curve has an advantage in maintaining data >> >>> > clustering. >> >>> > >> > >> >>> > >> > >> >>> > >> > Therefore, I propose an automatic selection based on the number of >> >>> > >> > comparison columns: >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > 1 column: Basic type comparison algorithm. >> >>> > >> > >> >>> > >> > Less than 5 columns: Z-order curve algorithm. >> >>> > >> > >> >>> > >> > 5 or more columns: Hilbert curve algorithm. >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > The threshold of 5 columns is based on Ververica's practice with >> >>> > Paimon >> >>> > >> > Append Scalable tables, which was also discussed offline with >> >>> > >> > Junhao >> >>> > Ye. >> >>> > >> > In addition to automatic configuration, users can fine-tune for >> >>> > specific >> >>> > >> > scenarios by explicitly specifying the desired comparison >> >>> > >> > strategy. >> >>> > >> > >> >>> > >> > >> >>> > >> > WDYT? >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > Best, >> >>> > >> > >> >>> > >> > Wencong >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > >> >>> > >> > At 2024-04-22 15:08:09, "Jingsong Li" <[email protected]> >> >>> > >> > wrote: >> >>> > >> > >Hi Wencong, >> >>> > >> > > >> >>> > >> > >Mostly looks good to me. >> >>> > >> > > >> >>> > >> > >"it will automatically determine the algorithm based on the >> >>> > >> > >number of >> >>> > >> > >columns in 'sink.clustering.by-columns'. " >> >>> > >> > > >> >>> > >> > >Please describe this clearly in the `Description`. >> >>> > >> > > >> >>> > >> > >Best, >> >>> > >> > >Jingsong >> >>> > >> > > >> >>> > >> > >On Mon, Apr 22, 2024 at 2:36 PM Wencong Liu >> >>> > >> > ><[email protected]> >> >>> > wrote: >> >>> > >> > >> >> >>> > >> > >> Hi devs, >> >>> > >> > >> >> >>> > >> > >> >> >>> > >> > >> >> >>> > >> > >> >> >>> > >> > >> I'm proposing a new feature to introduce range partitioning and >> >>> > sorting in append scalable table >> >>> > >> > >> >> >>> > >> > >> writing for Flink. The goal is to optimize query performance by >> >>> > reducing data scans on large datasets. >> >>> > >> > >> >> >>> > >> > >> >> >>> > >> > >> >> >>> > >> > >> >> >>> > >> > >> The proposal includes: >> >>> > >> > >> >> >>> > >> > >> >> >>> > >> > >> >> >>> > >> > >> >> >>> > >> > >> 1. Configurable range partitioning and sorting during data >> >>> > >> > >> writing >> >>> > which allows for >> >>> > >> > >> >> >>> > >> > >> a more efficient data distribution strategy. >> >>> > >> > >> >> >>> > >> > >> >> >>> > >> > >> >> >>> > >> > >> >> >>> > >> > >> 2. Introduction of new configurations that will enable users to >> >>> > specify columns for >> >>> > >> > >> >> >>> > >> > >> comparison, choose a comparison algorithm for range >> >>> > >> > >> partitioning, >> >>> > and further sort each >> >>> > >> > >> >> >>> > >> > >> partition if required. >> >>> > >> > >> >> >>> > >> > >> >> >>> > >> > >> >> >>> > >> > >> >> >>> > >> > >> 3. Detailed explanation of the division of processing steps >> >>> > >> > >> when >> >>> > range partitioning >> >>> > >> > >> >> >>> > >> > >> is enabled and the conditional inclusion of the sorting phase. >> >>> > >> > >> >> >>> > >> > >> >> >>> > >> > >> >> >>> > >> > >> >> >>> > >> > >> Looking forward to discussing this in the upcoming PIP [1]. >> >>> > >> > >> >> >>> > >> > >> >> >>> > >> > >> >> >>> > >> > >> >> >>> > >> > >> Best regards, >> >>> > >> > >> >> >>> > >> > >> Wencong Liu >> >>> > >> > >> >> >>> > >> > >> >> >>> > >> > >> >> >>> > >> > >> >> >>> > >> > >> [1] >> >>> > https://cwiki.apache.org/confluence/display/PAIMON/PIP-21%3A+Introduce+Range+Partition+And+Sort+in+Append+Scalable+Table+Batch+Writing+for+Flink >> >>> >
