Hi Wencong,

I misunderstood the meaning of sort-partition, it is very confusing to
table partition.

I suggest we can change 'sink.clustering.sort-partition' to
'sink.clustering.local-sort'.

And I think the default value should be true. Compared to distributed
range sort, local sorting is a low-cost behavior and we should
complete it by default.

Best,
Jingsong

On Mon, Apr 29, 2024 at 9:42 PM Jingsong Li <[email protected]> wrote:
>
> I second Xintong’s suggestion, we can just let the default value is order, 
> auto is too early for us now, you can take a look to other systems.
>
> And for sink.clustering.sort-partition:
> Indicates whether to further sort each partition after range partitioning, 
> enhancing data orderliness within each partition.
>
> Maybe adding partition fields to range sort is better? We already have spill 
> mechanism to avoid OOM in writing. This looks not so useful. But, range sort 
> to partition fields is useful. Can reduce  small files.
>
> Xintong Song <[email protected]>于2024年4月29日 周一15:26写道:
>>
>> +1 for the proposal in general. The feature should significantly improve
>> the performance that downstream workloads read data from the tables.
>>
>> I have a few suggestions / questions.
>>
>> 1. For `sink.clustering.by-columns`, I think it would be nice to explicitly
>> mention that not specified (or null) means the feature is not enabled.
>>
>> 2. For `sink.clustering.strategy`, I'd suggest not to expose the behaviors
>> when the value is `auto` to users. For this developer-oriented PIP
>> document, it's important to make the behavior clear so that people can vote
>> on it. But for the user-oriented configuration description, `auto` would
>> simply mean the system would automatically choose a strategy and users
>> don't need to worry about it. Moreover, not exposing the behavior would
>> give us the chance to change it in future if necessary, without breaking
>> any commitment that we made to users.
>>
>> 3. I'd like to understand a bit more about the sampling strategy. In
>> particular, how much data is sampled out of the entire data set? Is this
>> decided by a certain sampling rate, or is the amount of samples fixed
>> regardless of the size of the data set? Should the rate / amount be
>> configurable, or any practices suggest that a hard-coded parameter works
>> fine in most use cases?
>>
>> Best,
>>
>> Xintong
>>
>>
>>
>> On Tue, Apr 23, 2024 at 10:59 PM Wencong Liu <[email protected]> wrote:
>>
>> > Thanks for your reply.
>> > 1.Yes. The LocalSample will receive data emitted by the
>> > Upstream Operator and perform sampling. The
>> > specific sampling algorithm used is reservoir sampling [1].
>> > 2. Assign Range Index will wait until all records have
>> > been consumed by Local Sample and the result
>> > is generated by Global Sample.
>> >
>> > [1] https://arxiv.org/pdf/1903.12065v1.pdf
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > At 2024-04-23 20:48:45, "wj wang" <[email protected]> wrote:
>> > >Hi,Wencong
>> > >I have two small questions.
>> > >1. Add record will be emitted from `Upstream Operator` to `Local
>> > >Sample`? If not, what is the sample rule?
>> > >2. From pip, I infer that the record in `Assign Range Index` should
>> > >wait for the broadcast result from `Global Sample`,So How long do they
>> > >wait? Until all records have been consumed by `Local Sample` or not?
>> > >
>> > >Best,
>> > >wangwj
>> > >
>> > >On Mon, Apr 22, 2024 at 6:20 PM Jingsong Li <[email protected]>
>> > wrote:
>> > >>
>> > >> +1 for your proposal.
>> > >>
>> > >> You can add to the description.
>> > >>
>> > >> Best,
>> > >> Jingsong
>> > >>
>> > >> On Mon, Apr 22, 2024 at 6:15 PM Wencong Liu <[email protected]>
>> > wrote:
>> > >> >
>> > >> > Hi Jinsong,
>> > >> >
>> > >> >
>> > >> >
>> > >> >
>> > >> > This topic requires discussion, hence it wasn't directly addressed in
>> > the PIP.
>> > >> >
>> > >> >
>> > >> >
>> > >> > I believe the type of sorting algorithm to use depends on the number
>> > of
>> > >> > fields specified by the user for comparison. When only one comparison
>> > field is
>> > >> > specified, it's best to use basic data types for direct comparison
>> > for the most accurate
>> > >> > results. For multiple comparison fields, both the Z-order curve and
>> > Hilbert curve algorithms
>> > >> > are suitable. In such cases, data maintains a certain level of order
>> > in any comparison
>> > >> > field. Generally, the computation cost of the Z-order curve algorithm
>> > is lower
>> > >> > than that of the Hilbert curve algorithm. However, in high-dimensional
>> > >> > scenarios, the Hilbert curve has an advantage in maintaining data
>> > clustering.
>> > >> >
>> > >> >
>> > >> > Therefore, I propose an automatic selection based on the number of
>> > >> > comparison columns:
>> > >> >
>> > >> >
>> > >> >
>> > >> >
>> > >> > 1 column: Basic type comparison algorithm.
>> > >> >
>> > >> > Less than 5 columns: Z-order curve algorithm.
>> > >> >
>> > >> > 5 or more columns: Hilbert curve algorithm.
>> > >> >
>> > >> >
>> > >> >
>> > >> >
>> > >> > The threshold of 5 columns is based on Ververica's practice with
>> > Paimon
>> > >> > Append Scalable tables, which was also discussed offline with Junhao
>> > Ye.
>> > >> > In addition to automatic configuration, users can fine-tune for
>> > specific
>> > >> > scenarios by explicitly specifying the desired comparison strategy.
>> > >> >
>> > >> >
>> > >> > WDYT?
>> > >> >
>> > >> >
>> > >> >
>> > >> > Best,
>> > >> >
>> > >> > Wencong
>> > >> >
>> > >> >
>> > >> >
>> > >> >
>> > >> >
>> > >> >
>> > >> >
>> > >> >
>> > >> >
>> > >> >
>> > >> >
>> > >> >
>> > >> >
>> > >> >
>> > >> >
>> > >> >
>> > >> >
>> > >> > At 2024-04-22 15:08:09, "Jingsong Li" <[email protected]> wrote:
>> > >> > >Hi Wencong,
>> > >> > >
>> > >> > >Mostly looks good to me.
>> > >> > >
>> > >> > >"it will automatically determine the algorithm based on the number of
>> > >> > >columns in 'sink.clustering.by-columns'. "
>> > >> > >
>> > >> > >Please describe this clearly in the `Description`.
>> > >> > >
>> > >> > >Best,
>> > >> > >Jingsong
>> > >> > >
>> > >> > >On Mon, Apr 22, 2024 at 2:36 PM Wencong Liu <[email protected]>
>> > wrote:
>> > >> > >>
>> > >> > >> Hi devs,
>> > >> > >>
>> > >> > >>
>> > >> > >>
>> > >> > >>
>> > >> > >> I'm proposing a new feature to introduce range partitioning and
>> > sorting in append scalable table
>> > >> > >>
>> > >> > >> writing for Flink. The goal is to optimize query performance by
>> > reducing data scans on large datasets.
>> > >> > >>
>> > >> > >>
>> > >> > >>
>> > >> > >>
>> > >> > >> The proposal includes:
>> > >> > >>
>> > >> > >>
>> > >> > >>
>> > >> > >>
>> > >> > >> 1. Configurable range partitioning and sorting during data writing
>> > which allows for
>> > >> > >>
>> > >> > >> a more efficient data distribution strategy.
>> > >> > >>
>> > >> > >>
>> > >> > >>
>> > >> > >>
>> > >> > >> 2. Introduction of new configurations that will enable users to
>> > specify columns for
>> > >> > >>
>> > >> > >> comparison, choose a comparison algorithm for range partitioning,
>> > and further sort each
>> > >> > >>
>> > >> > >> partition if required.
>> > >> > >>
>> > >> > >>
>> > >> > >>
>> > >> > >>
>> > >> > >> 3. Detailed explanation of the division of processing steps when
>> > range partitioning
>> > >> > >>
>> > >> > >> is enabled and the conditional inclusion of the sorting phase.
>> > >> > >>
>> > >> > >>
>> > >> > >>
>> > >> > >>
>> > >> > >> Looking forward to discussing this in the upcoming PIP [1].
>> > >> > >>
>> > >> > >>
>> > >> > >>
>> > >> > >>
>> > >> > >> Best regards,
>> > >> > >>
>> > >> > >> Wencong Liu
>> > >> > >>
>> > >> > >>
>> > >> > >>
>> > >> > >>
>> > >> > >> [1]
>> > https://cwiki.apache.org/confluence/display/PAIMON/PIP-21%3A+Introduce+Range+Partition+And+Sort+in+Append+Scalable+Table+Batch+Writing+for+Flink
>> >

Reply via email to