Thanks Wencong for updating. +1
Best, Jingsong On Wed, May 8, 2024 at 11:02 PM Wencong Liu <[email protected]> wrote: > > Thank you for the responses from Xintong and Jinsong. > > > > > To Xintong, > > > Q1: > > The naming conventions for identifiers in Flink SQL are consistent > > with the SQL standard, meaning identifiers cannot contain spaces or > > commas, so using commas as separators is not a problem. > > > > > When this configuration item is not specified or is specified as an empty > string, > > it means that the range partition write feature is not enabled. > > I have added the related explanation in the description of the configuration > item. > > > > > > Q2: > > Following our offline discussions with Jinsong, we all agree that it is > > necessary to clearly state the specific judgment rules corresponding to > > the auto mode in the description of the 'sink.clustering.strategy' > > configuration item. > > > > > > Q3: > > In the current implementation, the total sample size equals the number > > of range partitions times 100, which aligns with Spark's implementation. Spark > > allows controlling the number of samples per range partition through the > > "spark.sql.execution.rangeExchange.sampleSizePerPartition" parameter, with a > > default of 100. Similarly, we can provide a > 'sink.clustering.sample-size-in-cluster' > > parameter for control. > > > > > To Jinsong, > > I agree with your option. Renaming the 'sink.clustering.sort-partition' > > configuration item to 'sink.clustering.sort-in-cluster' makes it > > easier to understand. The default value should be `true`, and the > > relevant description has been updated in the PIP. > > > > > Best, > > Wencong > > > > > > At 2024-05-06 14:57:05, "Jingsong Li" <[email protected]> wrote: > >Hi Wencong, > > > >I misunderstood the meaning of sort-partition, it is very confusing to > >table partition. > > > >I suggest we can change 'sink.clustering.sort-partition' to > >'sink.clustering.local-sort'. > > > >And I think the default value should be true. Compared to distributed > >range sort, local sorting is a low-cost behavior and we should > >complete it by default. > > > >Best, > >Jingsong > > > >On Mon, Apr 29, 2024 at 9:42 PM Jingsong Li <[email protected]> wrote: > >> > >> I second Xintong’s suggestion, we can just let the default value is order, > >> auto is too early for us now, you can take a look to other systems. > >> > >> And for sink.clustering.sort-partition: > >> Indicates whether to further sort each partition after range partitioning, > >> enhancing data orderliness within each partition. > >> > >> Maybe adding partition fields to range sort is better? We already have > >> spill mechanism to avoid OOM in writing. This looks not so useful. But, > >> range sort to partition fields is useful. Can reduce small files. > >> > >> Xintong Song <[email protected]>于2024年4月29日 周一15:26写道: > >>> > >>> +1 for the proposal in general. The feature should significantly improve > >>> the performance that downstream workloads read data from the tables. > >>> > >>> I have a few suggestions / questions. > >>> > >>> 1. For `sink.clustering.by-columns`, I think it would be nice to > >>> explicitly > >>> mention that not specified (or null) means the feature is not enabled. > >>> > >>> 2. For `sink.clustering.strategy`, I'd suggest not to expose the behaviors > >>> when the value is `auto` to users. For this developer-oriented PIP > >>> document, it's important to make the behavior clear so that people can > >>> vote > >>> on it. But for the user-oriented configuration description, `auto` would > >>> simply mean the system would automatically choose a strategy and users > >>> don't need to worry about it. Moreover, not exposing the behavior would > >>> give us the chance to change it in future if necessary, without breaking > >>> any commitment that we made to users. > >>> > >>> 3. I'd like to understand a bit more about the sampling strategy. In > >>> particular, how much data is sampled out of the entire data set? Is this > >>> decided by a certain sampling rate, or is the amount of samples fixed > >>> regardless of the size of the data set? Should the rate / amount be > >>> configurable, or any practices suggest that a hard-coded parameter works > >>> fine in most use cases? > >>> > >>> Best, > >>> > >>> Xintong > >>> > >>> > >>> > >>> On Tue, Apr 23, 2024 at 10:59 PM Wencong Liu <[email protected]> wrote: > >>> > >>> > Thanks for your reply. > >>> > 1.Yes. The LocalSample will receive data emitted by the > >>> > Upstream Operator and perform sampling. The > >>> > specific sampling algorithm used is reservoir sampling [1]. > >>> > 2. Assign Range Index will wait until all records have > >>> > been consumed by Local Sample and the result > >>> > is generated by Global Sample. > >>> > > >>> > [1] https://arxiv.org/pdf/1903.12065v1.pdf > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > At 2024-04-23 20:48:45, "wj wang" <[email protected]> wrote: > >>> > >Hi,Wencong > >>> > >I have two small questions. > >>> > >1. Add record will be emitted from `Upstream Operator` to `Local > >>> > >Sample`? If not, what is the sample rule? > >>> > >2. From pip, I infer that the record in `Assign Range Index` should > >>> > >wait for the broadcast result from `Global Sample`,So How long do they > >>> > >wait? Until all records have been consumed by `Local Sample` or not? > >>> > > > >>> > >Best, > >>> > >wangwj > >>> > > > >>> > >On Mon, Apr 22, 2024 at 6:20 PM Jingsong Li <[email protected]> > >>> > wrote: > >>> > >> > >>> > >> +1 for your proposal. > >>> > >> > >>> > >> You can add to the description. > >>> > >> > >>> > >> Best, > >>> > >> Jingsong > >>> > >> > >>> > >> On Mon, Apr 22, 2024 at 6:15 PM Wencong Liu <[email protected]> > >>> > wrote: > >>> > >> > > >>> > >> > Hi Jinsong, > >>> > >> > > >>> > >> > > >>> > >> > > >>> > >> > > >>> > >> > This topic requires discussion, hence it wasn't directly addressed > >>> > >> > in > >>> > the PIP. > >>> > >> > > >>> > >> > > >>> > >> > > >>> > >> > I believe the type of sorting algorithm to use depends on the > >>> > >> > number > >>> > of > >>> > >> > fields specified by the user for comparison. When only one > >>> > >> > comparison > >>> > field is > >>> > >> > specified, it's best to use basic data types for direct comparison > >>> > for the most accurate > >>> > >> > results. For multiple comparison fields, both the Z-order curve and > >>> > Hilbert curve algorithms > >>> > >> > are suitable. In such cases, data maintains a certain level of > >>> > >> > order > >>> > in any comparison > >>> > >> > field. Generally, the computation cost of the Z-order curve > >>> > >> > algorithm > >>> > is lower > >>> > >> > than that of the Hilbert curve algorithm. However, in > >>> > >> > high-dimensional > >>> > >> > scenarios, the Hilbert curve has an advantage in maintaining data > >>> > clustering. > >>> > >> > > >>> > >> > > >>> > >> > Therefore, I propose an automatic selection based on the number of > >>> > >> > comparison columns: > >>> > >> > > >>> > >> > > >>> > >> > > >>> > >> > > >>> > >> > 1 column: Basic type comparison algorithm. > >>> > >> > > >>> > >> > Less than 5 columns: Z-order curve algorithm. > >>> > >> > > >>> > >> > 5 or more columns: Hilbert curve algorithm. > >>> > >> > > >>> > >> > > >>> > >> > > >>> > >> > > >>> > >> > The threshold of 5 columns is based on Ververica's practice with > >>> > Paimon > >>> > >> > Append Scalable tables, which was also discussed offline with > >>> > >> > Junhao > >>> > Ye. > >>> > >> > In addition to automatic configuration, users can fine-tune for > >>> > specific > >>> > >> > scenarios by explicitly specifying the desired comparison strategy. > >>> > >> > > >>> > >> > > >>> > >> > WDYT? > >>> > >> > > >>> > >> > > >>> > >> > > >>> > >> > Best, > >>> > >> > > >>> > >> > Wencong > >>> > >> > > >>> > >> > > >>> > >> > > >>> > >> > > >>> > >> > > >>> > >> > > >>> > >> > > >>> > >> > > >>> > >> > > >>> > >> > > >>> > >> > > >>> > >> > > >>> > >> > > >>> > >> > > >>> > >> > > >>> > >> > > >>> > >> > > >>> > >> > At 2024-04-22 15:08:09, "Jingsong Li" <[email protected]> > >>> > >> > wrote: > >>> > >> > >Hi Wencong, > >>> > >> > > > >>> > >> > >Mostly looks good to me. > >>> > >> > > > >>> > >> > >"it will automatically determine the algorithm based on the > >>> > >> > >number of > >>> > >> > >columns in 'sink.clustering.by-columns'. " > >>> > >> > > > >>> > >> > >Please describe this clearly in the `Description`. > >>> > >> > > > >>> > >> > >Best, > >>> > >> > >Jingsong > >>> > >> > > > >>> > >> > >On Mon, Apr 22, 2024 at 2:36 PM Wencong Liu <[email protected]> > >>> > wrote: > >>> > >> > >> > >>> > >> > >> Hi devs, > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> I'm proposing a new feature to introduce range partitioning and > >>> > sorting in append scalable table > >>> > >> > >> > >>> > >> > >> writing for Flink. The goal is to optimize query performance by > >>> > reducing data scans on large datasets. > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> The proposal includes: > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> 1. Configurable range partitioning and sorting during data > >>> > >> > >> writing > >>> > which allows for > >>> > >> > >> > >>> > >> > >> a more efficient data distribution strategy. > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> 2. Introduction of new configurations that will enable users to > >>> > specify columns for > >>> > >> > >> > >>> > >> > >> comparison, choose a comparison algorithm for range > >>> > >> > >> partitioning, > >>> > and further sort each > >>> > >> > >> > >>> > >> > >> partition if required. > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> 3. Detailed explanation of the division of processing steps when > >>> > range partitioning > >>> > >> > >> > >>> > >> > >> is enabled and the conditional inclusion of the sorting phase. > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> Looking forward to discussing this in the upcoming PIP [1]. > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> Best regards, > >>> > >> > >> > >>> > >> > >> Wencong Liu > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> > >>> > >> > >> [1] > >>> > https://cwiki.apache.org/confluence/display/PAIMON/PIP-21%3A+Introduce+Range+Partition+And+Sort+in+Append+Scalable+Table+Batch+Writing+for+Flink > >>> >
