Hi everyone,

Thanks for your valuable feedback!

The discussion on this FLIP has been going on for a while.
I would like to start a vote after 48 hours.

Please let me know if you have any concerns or any further
questions/comments.

Regards,
Jeyhun


On Thu, Mar 21, 2024 at 6:01 PM Jeyhun Karimov <je.kari...@gmail.com> wrote:

> Hi Lorenzo,
>
> Thanks a lot for your comments. Please find my answers below:
>
>
> For the interface `SupportsPartitioning`, why returning `Optional`?
>> If one decides to implement that, partitions must exist (at maximum,
>> return and empty list). Returning `Optional` seem just to complicate the
>> logic of the code using that interface.
>
>
> - The reasoning behind the use of Optional is that sometimes (e.g., in
> HiveTableSource) the partitioning info is in catalog.
>   Therefore, we return Optional.empty(), so that the list of partitions is
> queried from the catalog.
>
>
> I foresee the using code doing something like: "if the source supports
>> partitioning, get the partitions, but if they don't exist, raise a runtime
>> exception". Let's simply make that safe at compile time and guarantee the
>> code that partitions exist.
>
>
> - Yes, once partitions cannot be found, neither from catalog nor from the
> interface implementation, then we raise an exception during query compile
> time.
>
>
>  Another thing is that you show Hive-like partitioning in your FS
>> structure, do you think it makes sense to add a note about auto-discovery
>> of partitions?
>
>
> - Yes, the FLIP contains just an example partitioning for filesystem
> connector. Each connector already "knows" about autodiscovery of its
> partitions. And we rely on this fact.
>   For example, partition discovery is different between kafka and
> filesystem sources. So, we do not handle the manual discovery of
> partitions. Please correct me if I misunderstood your question.
>
>
> In other terms, it looks a bit counterintuitive that the user implementing
>> the source has to specify which partitions exist statically (and they can
>> change at runtime), while the source itself knows the data provider and can
>> directly implement a method `discoverPartitions`. Then Flink would take
>> care of invoking that method when needed.
>
>
> We utilize table option SOURCE_MONITOR_INTERVAL to check whether
> partitions are static or not. So, a user still should give Flink a hint
> about partitions being static or not. With static partitions Flink can do
> more optimizations.
>
> Please let me know if my replies answer your questions or if you have more
> comments.
>
> Regards,
> Jeyhun
>
>
>
> On Thu, Mar 21, 2024 at 10:03 AM <lorenzo.affe...@ververica.com> wrote:
>
>> Hello Jeyhun,
>> I really like the proposal and definitely makes sense to me.
>>
>> I have a couple of nits here and there:
>>
>> For the interface `SupportsPartitioning`, why returning `Optional`?
>> If one decides to implement that, partitions must exist (at maximum,
>> return and empty list). Returning `Optional` seem just to complicate the
>> logic of the code using that interface.
>>
>> I foresee the using code doing something like: "if the source supports
>> partitioning, get the partitions, but if they don't exist, raise a runtime
>> exception". Let's simply make that safe at compile time and guarantee the
>> code that partitions exist.
>>
>> Another thing is that you show Hive-like partitioning in your FS
>> structure, do you think it makes sense to add a note about auto-discovery
>> of partitions?
>>
>> In other terms, it looks a bit counterintuitive that the user
>> implementing the source has to specify which partitions exist statically
>> (and they can change at runtime), while the source itself knows the data
>> provider and can directly implement a method `discoverPartitions`. Then
>> Flink would take care of invoking that method when needed.
>> On Mar 15, 2024 at 22:09 +0100, Jeyhun Karimov <je.kari...@gmail.com>,
>> wrote:
>>
>> Hi Benchao,
>>
>> Thanks for your comments.
>>
>> 1. What the parallelism would you take? E.g., 128 + 256 => 128? What
>>
>> if we cannot have a good greatest common divisor, like 127 + 128,
>> could we just utilize one side's pre-partitioned attribute, and let
>> another side just do the shuffle?
>>
>>
>>
>> There are two cases we need to consider:
>>
>> 1. Static Partition (no partitions are added during the query execution)
>> is
>> enabled AND both sources implement "SupportsPartitionPushdown"
>>
>> In this case, we are sure that no new partitions will be added at runtime.
>> So, we have a chance equalize both sources' partitions and parallelism,
>> IFF
>> both sources implement "SupportsPartitionPushdown" interface.
>> To achieve so, first we will fetch the existing partitions from source1
>> (say p_s1) and from source2 (say p_s2).
>> Then, we find the intersection of these two partition sets (say
>> p_intersect) and pushdown these partitions:
>>
>> SupportsPartitionPushDown::applyPartitions(p_intersect) // make sure that
>> only specific partitions are read
>> SupportsPartitioning::applyPartitionedRead(p_intersect) // partitioned
>> read
>> with filtered partitions
>>
>> Lastly, we need to change the parallelism of 1) source1, 2) source2, and
>> 3)
>> all of their downstream operators until (and including) their first common
>> ancestor (e.g., join) to be equal to the number of partitions (size of
>> p_intersect).
>>
>> 2. All other cases
>>
>> In all other cases, the parallelism of both sources and their downstream
>> operators until their common ancestor would be equal to the MIN(p_s1,
>> p_s2).
>> That is, minimum of the partition size of source1 and partition size of
>> source2 will be selected as the parallelism.
>> Coming back to your example, if source1 parallelism is 127 and source2
>> parallelism is 128, then we will first check the partition size of source1
>> and source2.
>> Say partition size of source1 is 100 and partition size of source2 is 90.
>> Then, we would set the parallelism for source1, source2, and all of their
>> downstream operators until (and including) the join operator
>> to 90 (min(100, 90)).
>> We also plan to implement a cost based decision instead of the rule-based
>> one (the ones explained above - MIN rule).
>> One possible result of the cost based estimation is to keep the partitions
>> on one side and perform the shuffling on another source.
>>
>>
>>
>> 2. In our current shuffle remove design (FlinkExpandConversionRule),
>>
>> we don't consider parallelism, we just remove unnecessary shuffles
>> according to the distribution columns. After this FLIP, the
>> parallelism may be bundled with source's partitions, then how will
>> this optimization accommodate with FlinkExpandConversionRule, will you
>> also change downstream operator's parallelisms if we want to also
>> remove subsequent shuffles?
>>
>>
>>
>>
>> - From my understanding of FlinkExpandConversionRule, its removal logic is
>> agnostic to operator parallelism.
>> So, if FlinkExpandConversionRule decides to remove a shuffle operation,
>> then this FLIP will search another possible shuffle (the one closest to
>> the
>> source) to remove.
>> If there is such an opportunity, this FLIP will remove the shuffle. So,
>> from my understanding FlinkExpandConversionRule and this optimization rule
>> can work together safely.
>> Please correct me if I misunderstood your question.
>>
>>
>>
>> Regarding the new optimization rule, have you also considered to allow
>>
>> some non-strict mode like FlinkRelDistribution#requireStrict? For
>> example, source is pre-partitioned by a, b columns, if we are
>> consuming this source, and do a aggregate on a, b, c, can we utilize
>> this optimization?
>>
>>
>>
>> - Good point. Yes, there are some cases that non-strict mode will apply.
>> For example:
>>
>> - pre-partitioned columns and aggregate columns are the same but have
>> different order (e.g., source pre-partitioned w.r.t. a,b and aggregate has
>> a GROUP BY b,a)
>> - columns in the Exchange operator is a list-prefix of pre-partitoned
>> columns of source (e.g., source is pre-partitioned w.r.t. a,b,c and
>> Exchange's partition columns are a,b)
>>
>> Please let me know if the above answers your questions or if you have any
>> other comments.
>>
>> Regards,
>> Jeyhun
>>
>> On Thu, Mar 14, 2024 at 12:48 PM Benchao Li <libenc...@apache.org> wrote:
>>
>> Thanks Jeyhun for bringing up this discussion, it is really exiting,
>> +1 for the general idea.
>>
>> We also introduced a similar concept in Flink Batch internally to cope
>> with bucketed tables in Hive, it is a very important improvement.
>>
>> > One thing to note is that for join queries, the parallelism of each join
>> > source might be different. This might result in
>> > inconsistencies while using the pre-partitioned/pre-divided data (e.g.,
>> > different mappings of partitions to source operators).
>> > Therefore, it is the job of planner to detect this and adjust the
>> > parallelism. With that having in mind,
>> > the rest (how the split assigners perform) is consistent among many
>> > sources.
>>
>>
>> Could you elaborate a little more on this. I added my two cents here
>> about this part:
>> 1. What the parallelism would you take? E.g., 128 + 256 => 128? What
>> if we cannot have a good greatest common divisor, like 127 + 128,
>> could we just utilize one side's pre-partitioned attribute, and let
>> another side just do the shuffle?
>> 2. In our current shuffle remove design (FlinkExpandConversionRule),
>> we don't consider parallelism, we just remove unnecessary shuffles
>> according to the distribution columns. After this FLIP, the
>> parallelism may be bundled with source's partitions, then how will
>> this optimization accommodate with FlinkExpandConversionRule, will you
>> also change downstream operator's parallelisms if we want to also
>> remove subsequent shuffles?
>>
>>
>> Regarding the new optimization rule, have you also considered to allow
>> some non-strict mode like FlinkRelDistribution#requireStrict? For
>> example, source is pre-partitioned by a, b columns, if we are
>> consuming this source, and do a aggregate on a, b, c, can we utilize
>> this optimization?
>>
>> Jane Chan <qingyue....@gmail.com> 于2024年3月14日周四 15:24写道:
>>
>> >
>> > Hi Jeyhun,
>> >
>> > Thanks for your clarification.
>> >
>>
>> > > Once a new partition is detected, we add it to our existing mapping.
>>
>> Our
>>
>> > mapping looks like Map<Integer, Set<Integer>>
>>
>> subtaskToPartitionAssignment,
>>
>> > where it maps each source subtaskID to zero or more partitions.
>> >
>> > I understand your point. **It would be better if you could sync the
>> > content to the FLIP**.
>> >
>> > Another thing is I'm curious about what the physical plan looks like. Is
>> > there any specific info that will be added to the table source (like
>> > filter/project pushdown)? It would be great if you could attach an
>>
>> example
>>
>> > to the FLIP.
>> >
>> > Bests,
>> > Jane
>> >
>> > On Wed, Mar 13, 2024 at 9:11 PM Jeyhun Karimov <je.kari...@gmail.com>
>>
>> wrote:
>>
>> >
>>
>> > > Hi Jane,
>> > >
>> > > Thanks for your comments.
>> > >
>> > >
>> > > 1. Concerning the `sourcePartitions()` method, the partition
>>
>> information
>>
>> > > > returned during the optimization phase may not be the same as the
>>
>> > > partition
>>
>> > > > information during runtime execution. For long-running jobs,
>>
>> partitions
>>
>> > > may
>>
>> > > > be continuously created. Is this FLIP equipped to handle scenarios?
>>
>> > >
>> > >
>> > > - Good point. This scenario is definitely supported.
>> > > Once a new partition is added, or in general, new splits are
>> > > discovered,
>> > > PartitionAwareSplitAssigner::addSplits(Collection<FileSourceSplit>
>> > > newSplits)
>> > > method will be called. Inside that method, we are able to detect if a
>>
>> split
>>
>> > > belongs to existing partitions or there is a new partition.
>> > > Once a new partition is detected, we add it to our existing mapping.
>>
>> Our
>>
>> > > mapping looks like Map<Integer, Set<Integer>>
>>
>> subtaskToPartitionAssignment,
>>
>> > > where
>> > > it maps each source subtaskID to zero or more partitions.
>> > >
>> > > 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I
>>
>> > > > understand that it is also necessary to verify whether the hash key
>>
>> > > within
>>
>> > > > the Exchange node is consistent with the partition key defined in
>> the
>>
>> > > table
>>
>> > > > source that implements `SupportsPartitioning`.
>>
>> > >
>> > >
>> > > - Yes, I overlooked that point, fixed. Actually, the rule is much
>> > > complicated. I tried to simplify it in the FLIP. Good point.
>> > >
>> > >
>> > > 3. Could you elaborate on the desired physical plan and integration
>>
>> with
>>
>> > > > `CompiledPlan` to enhance the overall functionality?
>>
>> > >
>> > >
>> > > - For compiled plan, PartitioningSpec will be used, with a json tag
>> > > "Partitioning". As a result, in the compiled plan, the source operator
>>
>> will
>>
>> > > have
>> > > "abilities" : [ { "type" : "Partitioning" } ] as part of the compiled
>>
>> plan.
>>
>> > > More about the implementation details below:
>> > >
>> > > --------------------------------
>> > > PartitioningSpec class
>> > > --------------------------------
>> > > @JsonTypeName("Partitioning")
>> > > public final class PartitioningSpec extends SourceAbilitySpecBase {
>> > > // some code here
>> > > @Override
>> > > public void apply(DynamicTableSource tableSource,
>>
>> SourceAbilityContext
>>
>> > > context) {
>> > > if (tableSource instanceof SupportsPartitioning) {
>> > > ((SupportsPartitioning<?>)
>>
>> tableSource).applyPartitionedRead();
>>
>> > > } else {
>> > > throw new TableException(
>> > > String.format(
>> > > "%s does not support
>>
>> SupportsPartitioning.",
>>
>> > > tableSource.getClass().getName()));
>> > > }
>> > > }
>> > > // some code here
>> > > }
>> > >
>> > > --------------------------------
>> > > SourceAbilitySpec class
>> > > --------------------------------
>> > > @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include =
>> > > JsonTypeInfo.As.PROPERTY, property = "type")
>> > > @JsonSubTypes({
>> > > @JsonSubTypes.Type(value = FilterPushDownSpec.class),
>> > > @JsonSubTypes.Type(value = LimitPushDownSpec.class),
>> > > @JsonSubTypes.Type(value = PartitionPushDownSpec.class),
>> > > @JsonSubTypes.Type(value = ProjectPushDownSpec.class),
>> > > @JsonSubTypes.Type(value = ReadingMetadataSpec.class),
>> > > @JsonSubTypes.Type(value = WatermarkPushDownSpec.class),
>> > > @JsonSubTypes.Type(value = SourceWatermarkSpec.class),
>> > > @JsonSubTypes.Type(value = AggregatePushDownSpec.class),
>> > > + @JsonSubTypes.Type(value = PartitioningSpec.class)
>>
>> //
>>
>> > > new added
>> > >
>> > >
>> > >
>> > > Please let me know if that answers your questions or if you have other
>> > > comments.
>> > >
>> > > Regards,
>> > > Jeyhun
>> > >
>> > >
>> > > On Tue, Mar 12, 2024 at 8:56 AM Jane Chan <qingyue....@gmail.com>
>>
>> wrote:
>>
>> > >
>>
>> > > > Hi Jeyhun,
>> > > >
>> > > > Thank you for leading the discussion. I'm generally +1 with this
>>
>> > > proposal,
>>
>> > > > along with some questions. Please see my comments below.
>> > > >
>> > > > 1. Concerning the `sourcePartitions()` method, the partition
>>
>> information
>>
>> > > > returned during the optimization phase may not be the same as the
>>
>> > > partition
>>
>> > > > information during runtime execution. For long-running jobs,
>>
>> partitions
>>
>> > > may
>>
>> > > > be continuously created. Is this FLIP equipped to handle scenarios?
>> > > >
>> > > > 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I
>> > > > understand that it is also necessary to verify whether the hash key
>>
>> > > within
>>
>> > > > the Exchange node is consistent with the partition key defined in
>> the
>>
>> > > table
>>
>> > > > source that implements `SupportsPartitioning`.
>> > > >
>> > > > 3. Could you elaborate on the desired physical plan and integration
>>
>> with
>>
>> > > > `CompiledPlan` to enhance the overall functionality?
>> > > >
>> > > > Best,
>> > > > Jane
>> > > >
>> > > > On Tue, Mar 12, 2024 at 11:11 AM Jim Hughes
>>
>> <jhug...@confluent.io.invalid
>>
>> > > >
>> > > > wrote:
>> > > >
>>
>> > > > > Hi Jeyhun,
>> > > > >
>> > > > > I like the idea! Given FLIP-376[1], I wonder if it'd make sense to
>> > > > > generalize FLIP-434 to be about "pre-divided" data to cover
>>
>> "buckets"
>>
>> > > and
>>
>> > > > > "partitions" (and maybe even situations where a data source is
>>
>> > > > partitioned
>>
>> > > > > and bucketed).
>> > > > >
>> > > > > Separate from that, the page mentions TPC-H Q1 as an example. For
>>
>> a
>>
>> > > > join,
>>
>> > > > > any two tables joined on the same bucket key should provide a
>>
>> concrete
>>
>> > > > > example of a join. Systems like Kafka Streams/ksqlDB call this
>> > > > > "co-partitioning"; for those systems, it is a requirement placed
>>
>> on the
>>
>> > > > > input sources. For Flink, with FLIP-434, the proposed planner rule
>> > > > > could remove the shuffle.
>> > > > >
>> > > > > Definitely a fun idea; I look forward to hearing more!
>> > > > >
>> > > > > Cheers,
>> > > > >
>> > > > > Jim
>> > > > >
>> > > > >
>> > > > > 1.
>> > > > >
>> > > > >
>>
>> > > >
>>
>> > >
>>
>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause
>>
>> > > > > 2.
>> > > > >
>> > > > >
>>
>> > > >
>>
>> > >
>>
>>
>> https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/#co-partitioning-requirements
>>
>> > > > >
>> > > > > On Sun, Mar 10, 2024 at 3:38 PM Jeyhun Karimov <
>>
>> je.kari...@gmail.com>
>>
>> > > > > wrote:
>> > > > >
>>
>> > > > > > Hi devs,
>> > > > > >
>> > > > > > I’d like to start a discussion on FLIP-434: Support
>>
>> optimizations for
>>
>> > > > > > pre-partitioned data sources [1].
>> > > > > >
>> > > > > > The FLIP introduces taking advantage of pre-partitioned data
>>
>> sources
>>
>> > > > for
>>
>> > > > > > SQL/Table API (it is already supported as experimental feature
>> in
>> > > > > > DataStream API [2]).
>> > > > > >
>> > > > > >
>> > > > > > Please find more details in the FLIP wiki document [1].
>> > > > > > Looking forward to your feedback.
>> > > > > >
>> > > > > > Regards,
>> > > > > > Jeyhun
>> > > > > >
>> > > > > > [1]
>> > > > > >
>> > > > > >
>>
>> > > > >
>>
>> > > >
>>
>> > >
>>
>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-434%3A+Support+optimizations+for+pre-partitioned+data+sources
>>
>> > > > > > [2]
>> > > > > >
>> > > > > >
>>
>> > > > >
>>
>> > > >
>>
>> > >
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/
>>
>> > > > > >
>>
>> > > > >
>>
>> > > >
>>
>> > >
>>
>>
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>>

Reply via email to