Hi everyone, Thanks for your valuable feedback!
The discussion on this FLIP has been going on for a while. I would like to start a vote after 48 hours. Please let me know if you have any concerns or any further questions/comments. Regards, Jeyhun On Thu, Mar 21, 2024 at 6:01 PM Jeyhun Karimov <je.kari...@gmail.com> wrote: > Hi Lorenzo, > > Thanks a lot for your comments. Please find my answers below: > > > For the interface `SupportsPartitioning`, why returning `Optional`? >> If one decides to implement that, partitions must exist (at maximum, >> return and empty list). Returning `Optional` seem just to complicate the >> logic of the code using that interface. > > > - The reasoning behind the use of Optional is that sometimes (e.g., in > HiveTableSource) the partitioning info is in catalog. > Therefore, we return Optional.empty(), so that the list of partitions is > queried from the catalog. > > > I foresee the using code doing something like: "if the source supports >> partitioning, get the partitions, but if they don't exist, raise a runtime >> exception". Let's simply make that safe at compile time and guarantee the >> code that partitions exist. > > > - Yes, once partitions cannot be found, neither from catalog nor from the > interface implementation, then we raise an exception during query compile > time. > > > Another thing is that you show Hive-like partitioning in your FS >> structure, do you think it makes sense to add a note about auto-discovery >> of partitions? > > > - Yes, the FLIP contains just an example partitioning for filesystem > connector. Each connector already "knows" about autodiscovery of its > partitions. And we rely on this fact. > For example, partition discovery is different between kafka and > filesystem sources. So, we do not handle the manual discovery of > partitions. Please correct me if I misunderstood your question. > > > In other terms, it looks a bit counterintuitive that the user implementing >> the source has to specify which partitions exist statically (and they can >> change at runtime), while the source itself knows the data provider and can >> directly implement a method `discoverPartitions`. Then Flink would take >> care of invoking that method when needed. > > > We utilize table option SOURCE_MONITOR_INTERVAL to check whether > partitions are static or not. So, a user still should give Flink a hint > about partitions being static or not. With static partitions Flink can do > more optimizations. > > Please let me know if my replies answer your questions or if you have more > comments. > > Regards, > Jeyhun > > > > On Thu, Mar 21, 2024 at 10:03 AM <lorenzo.affe...@ververica.com> wrote: > >> Hello Jeyhun, >> I really like the proposal and definitely makes sense to me. >> >> I have a couple of nits here and there: >> >> For the interface `SupportsPartitioning`, why returning `Optional`? >> If one decides to implement that, partitions must exist (at maximum, >> return and empty list). Returning `Optional` seem just to complicate the >> logic of the code using that interface. >> >> I foresee the using code doing something like: "if the source supports >> partitioning, get the partitions, but if they don't exist, raise a runtime >> exception". Let's simply make that safe at compile time and guarantee the >> code that partitions exist. >> >> Another thing is that you show Hive-like partitioning in your FS >> structure, do you think it makes sense to add a note about auto-discovery >> of partitions? >> >> In other terms, it looks a bit counterintuitive that the user >> implementing the source has to specify which partitions exist statically >> (and they can change at runtime), while the source itself knows the data >> provider and can directly implement a method `discoverPartitions`. Then >> Flink would take care of invoking that method when needed. >> On Mar 15, 2024 at 22:09 +0100, Jeyhun Karimov <je.kari...@gmail.com>, >> wrote: >> >> Hi Benchao, >> >> Thanks for your comments. >> >> 1. What the parallelism would you take? E.g., 128 + 256 => 128? What >> >> if we cannot have a good greatest common divisor, like 127 + 128, >> could we just utilize one side's pre-partitioned attribute, and let >> another side just do the shuffle? >> >> >> >> There are two cases we need to consider: >> >> 1. Static Partition (no partitions are added during the query execution) >> is >> enabled AND both sources implement "SupportsPartitionPushdown" >> >> In this case, we are sure that no new partitions will be added at runtime. >> So, we have a chance equalize both sources' partitions and parallelism, >> IFF >> both sources implement "SupportsPartitionPushdown" interface. >> To achieve so, first we will fetch the existing partitions from source1 >> (say p_s1) and from source2 (say p_s2). >> Then, we find the intersection of these two partition sets (say >> p_intersect) and pushdown these partitions: >> >> SupportsPartitionPushDown::applyPartitions(p_intersect) // make sure that >> only specific partitions are read >> SupportsPartitioning::applyPartitionedRead(p_intersect) // partitioned >> read >> with filtered partitions >> >> Lastly, we need to change the parallelism of 1) source1, 2) source2, and >> 3) >> all of their downstream operators until (and including) their first common >> ancestor (e.g., join) to be equal to the number of partitions (size of >> p_intersect). >> >> 2. All other cases >> >> In all other cases, the parallelism of both sources and their downstream >> operators until their common ancestor would be equal to the MIN(p_s1, >> p_s2). >> That is, minimum of the partition size of source1 and partition size of >> source2 will be selected as the parallelism. >> Coming back to your example, if source1 parallelism is 127 and source2 >> parallelism is 128, then we will first check the partition size of source1 >> and source2. >> Say partition size of source1 is 100 and partition size of source2 is 90. >> Then, we would set the parallelism for source1, source2, and all of their >> downstream operators until (and including) the join operator >> to 90 (min(100, 90)). >> We also plan to implement a cost based decision instead of the rule-based >> one (the ones explained above - MIN rule). >> One possible result of the cost based estimation is to keep the partitions >> on one side and perform the shuffling on another source. >> >> >> >> 2. In our current shuffle remove design (FlinkExpandConversionRule), >> >> we don't consider parallelism, we just remove unnecessary shuffles >> according to the distribution columns. After this FLIP, the >> parallelism may be bundled with source's partitions, then how will >> this optimization accommodate with FlinkExpandConversionRule, will you >> also change downstream operator's parallelisms if we want to also >> remove subsequent shuffles? >> >> >> >> >> - From my understanding of FlinkExpandConversionRule, its removal logic is >> agnostic to operator parallelism. >> So, if FlinkExpandConversionRule decides to remove a shuffle operation, >> then this FLIP will search another possible shuffle (the one closest to >> the >> source) to remove. >> If there is such an opportunity, this FLIP will remove the shuffle. So, >> from my understanding FlinkExpandConversionRule and this optimization rule >> can work together safely. >> Please correct me if I misunderstood your question. >> >> >> >> Regarding the new optimization rule, have you also considered to allow >> >> some non-strict mode like FlinkRelDistribution#requireStrict? For >> example, source is pre-partitioned by a, b columns, if we are >> consuming this source, and do a aggregate on a, b, c, can we utilize >> this optimization? >> >> >> >> - Good point. Yes, there are some cases that non-strict mode will apply. >> For example: >> >> - pre-partitioned columns and aggregate columns are the same but have >> different order (e.g., source pre-partitioned w.r.t. a,b and aggregate has >> a GROUP BY b,a) >> - columns in the Exchange operator is a list-prefix of pre-partitoned >> columns of source (e.g., source is pre-partitioned w.r.t. a,b,c and >> Exchange's partition columns are a,b) >> >> Please let me know if the above answers your questions or if you have any >> other comments. >> >> Regards, >> Jeyhun >> >> On Thu, Mar 14, 2024 at 12:48 PM Benchao Li <libenc...@apache.org> wrote: >> >> Thanks Jeyhun for bringing up this discussion, it is really exiting, >> +1 for the general idea. >> >> We also introduced a similar concept in Flink Batch internally to cope >> with bucketed tables in Hive, it is a very important improvement. >> >> > One thing to note is that for join queries, the parallelism of each join >> > source might be different. This might result in >> > inconsistencies while using the pre-partitioned/pre-divided data (e.g., >> > different mappings of partitions to source operators). >> > Therefore, it is the job of planner to detect this and adjust the >> > parallelism. With that having in mind, >> > the rest (how the split assigners perform) is consistent among many >> > sources. >> >> >> Could you elaborate a little more on this. I added my two cents here >> about this part: >> 1. What the parallelism would you take? E.g., 128 + 256 => 128? What >> if we cannot have a good greatest common divisor, like 127 + 128, >> could we just utilize one side's pre-partitioned attribute, and let >> another side just do the shuffle? >> 2. In our current shuffle remove design (FlinkExpandConversionRule), >> we don't consider parallelism, we just remove unnecessary shuffles >> according to the distribution columns. After this FLIP, the >> parallelism may be bundled with source's partitions, then how will >> this optimization accommodate with FlinkExpandConversionRule, will you >> also change downstream operator's parallelisms if we want to also >> remove subsequent shuffles? >> >> >> Regarding the new optimization rule, have you also considered to allow >> some non-strict mode like FlinkRelDistribution#requireStrict? For >> example, source is pre-partitioned by a, b columns, if we are >> consuming this source, and do a aggregate on a, b, c, can we utilize >> this optimization? >> >> Jane Chan <qingyue....@gmail.com> 于2024年3月14日周四 15:24写道: >> >> > >> > Hi Jeyhun, >> > >> > Thanks for your clarification. >> > >> >> > > Once a new partition is detected, we add it to our existing mapping. >> >> Our >> >> > mapping looks like Map<Integer, Set<Integer>> >> >> subtaskToPartitionAssignment, >> >> > where it maps each source subtaskID to zero or more partitions. >> > >> > I understand your point. **It would be better if you could sync the >> > content to the FLIP**. >> > >> > Another thing is I'm curious about what the physical plan looks like. Is >> > there any specific info that will be added to the table source (like >> > filter/project pushdown)? It would be great if you could attach an >> >> example >> >> > to the FLIP. >> > >> > Bests, >> > Jane >> > >> > On Wed, Mar 13, 2024 at 9:11 PM Jeyhun Karimov <je.kari...@gmail.com> >> >> wrote: >> >> > >> >> > > Hi Jane, >> > > >> > > Thanks for your comments. >> > > >> > > >> > > 1. Concerning the `sourcePartitions()` method, the partition >> >> information >> >> > > > returned during the optimization phase may not be the same as the >> >> > > partition >> >> > > > information during runtime execution. For long-running jobs, >> >> partitions >> >> > > may >> >> > > > be continuously created. Is this FLIP equipped to handle scenarios? >> >> > > >> > > >> > > - Good point. This scenario is definitely supported. >> > > Once a new partition is added, or in general, new splits are >> > > discovered, >> > > PartitionAwareSplitAssigner::addSplits(Collection<FileSourceSplit> >> > > newSplits) >> > > method will be called. Inside that method, we are able to detect if a >> >> split >> >> > > belongs to existing partitions or there is a new partition. >> > > Once a new partition is detected, we add it to our existing mapping. >> >> Our >> >> > > mapping looks like Map<Integer, Set<Integer>> >> >> subtaskToPartitionAssignment, >> >> > > where >> > > it maps each source subtaskID to zero or more partitions. >> > > >> > > 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I >> >> > > > understand that it is also necessary to verify whether the hash key >> >> > > within >> >> > > > the Exchange node is consistent with the partition key defined in >> the >> >> > > table >> >> > > > source that implements `SupportsPartitioning`. >> >> > > >> > > >> > > - Yes, I overlooked that point, fixed. Actually, the rule is much >> > > complicated. I tried to simplify it in the FLIP. Good point. >> > > >> > > >> > > 3. Could you elaborate on the desired physical plan and integration >> >> with >> >> > > > `CompiledPlan` to enhance the overall functionality? >> >> > > >> > > >> > > - For compiled plan, PartitioningSpec will be used, with a json tag >> > > "Partitioning". As a result, in the compiled plan, the source operator >> >> will >> >> > > have >> > > "abilities" : [ { "type" : "Partitioning" } ] as part of the compiled >> >> plan. >> >> > > More about the implementation details below: >> > > >> > > -------------------------------- >> > > PartitioningSpec class >> > > -------------------------------- >> > > @JsonTypeName("Partitioning") >> > > public final class PartitioningSpec extends SourceAbilitySpecBase { >> > > // some code here >> > > @Override >> > > public void apply(DynamicTableSource tableSource, >> >> SourceAbilityContext >> >> > > context) { >> > > if (tableSource instanceof SupportsPartitioning) { >> > > ((SupportsPartitioning<?>) >> >> tableSource).applyPartitionedRead(); >> >> > > } else { >> > > throw new TableException( >> > > String.format( >> > > "%s does not support >> >> SupportsPartitioning.", >> >> > > tableSource.getClass().getName())); >> > > } >> > > } >> > > // some code here >> > > } >> > > >> > > -------------------------------- >> > > SourceAbilitySpec class >> > > -------------------------------- >> > > @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = >> > > JsonTypeInfo.As.PROPERTY, property = "type") >> > > @JsonSubTypes({ >> > > @JsonSubTypes.Type(value = FilterPushDownSpec.class), >> > > @JsonSubTypes.Type(value = LimitPushDownSpec.class), >> > > @JsonSubTypes.Type(value = PartitionPushDownSpec.class), >> > > @JsonSubTypes.Type(value = ProjectPushDownSpec.class), >> > > @JsonSubTypes.Type(value = ReadingMetadataSpec.class), >> > > @JsonSubTypes.Type(value = WatermarkPushDownSpec.class), >> > > @JsonSubTypes.Type(value = SourceWatermarkSpec.class), >> > > @JsonSubTypes.Type(value = AggregatePushDownSpec.class), >> > > + @JsonSubTypes.Type(value = PartitioningSpec.class) >> >> // >> >> > > new added >> > > >> > > >> > > >> > > Please let me know if that answers your questions or if you have other >> > > comments. >> > > >> > > Regards, >> > > Jeyhun >> > > >> > > >> > > On Tue, Mar 12, 2024 at 8:56 AM Jane Chan <qingyue....@gmail.com> >> >> wrote: >> >> > > >> >> > > > Hi Jeyhun, >> > > > >> > > > Thank you for leading the discussion. I'm generally +1 with this >> >> > > proposal, >> >> > > > along with some questions. Please see my comments below. >> > > > >> > > > 1. Concerning the `sourcePartitions()` method, the partition >> >> information >> >> > > > returned during the optimization phase may not be the same as the >> >> > > partition >> >> > > > information during runtime execution. For long-running jobs, >> >> partitions >> >> > > may >> >> > > > be continuously created. Is this FLIP equipped to handle scenarios? >> > > > >> > > > 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I >> > > > understand that it is also necessary to verify whether the hash key >> >> > > within >> >> > > > the Exchange node is consistent with the partition key defined in >> the >> >> > > table >> >> > > > source that implements `SupportsPartitioning`. >> > > > >> > > > 3. Could you elaborate on the desired physical plan and integration >> >> with >> >> > > > `CompiledPlan` to enhance the overall functionality? >> > > > >> > > > Best, >> > > > Jane >> > > > >> > > > On Tue, Mar 12, 2024 at 11:11 AM Jim Hughes >> >> <jhug...@confluent.io.invalid >> >> > > > >> > > > wrote: >> > > > >> >> > > > > Hi Jeyhun, >> > > > > >> > > > > I like the idea! Given FLIP-376[1], I wonder if it'd make sense to >> > > > > generalize FLIP-434 to be about "pre-divided" data to cover >> >> "buckets" >> >> > > and >> >> > > > > "partitions" (and maybe even situations where a data source is >> >> > > > partitioned >> >> > > > > and bucketed). >> > > > > >> > > > > Separate from that, the page mentions TPC-H Q1 as an example. For >> >> a >> >> > > > join, >> >> > > > > any two tables joined on the same bucket key should provide a >> >> concrete >> >> > > > > example of a join. Systems like Kafka Streams/ksqlDB call this >> > > > > "co-partitioning"; for those systems, it is a requirement placed >> >> on the >> >> > > > > input sources. For Flink, with FLIP-434, the proposed planner rule >> > > > > could remove the shuffle. >> > > > > >> > > > > Definitely a fun idea; I look forward to hearing more! >> > > > > >> > > > > Cheers, >> > > > > >> > > > > Jim >> > > > > >> > > > > >> > > > > 1. >> > > > > >> > > > > >> >> > > > >> >> > > >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause >> >> > > > > 2. >> > > > > >> > > > > >> >> > > > >> >> > > >> >> >> https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/#co-partitioning-requirements >> >> > > > > >> > > > > On Sun, Mar 10, 2024 at 3:38 PM Jeyhun Karimov < >> >> je.kari...@gmail.com> >> >> > > > > wrote: >> > > > > >> >> > > > > > Hi devs, >> > > > > > >> > > > > > I’d like to start a discussion on FLIP-434: Support >> >> optimizations for >> >> > > > > > pre-partitioned data sources [1]. >> > > > > > >> > > > > > The FLIP introduces taking advantage of pre-partitioned data >> >> sources >> >> > > > for >> >> > > > > > SQL/Table API (it is already supported as experimental feature >> in >> > > > > > DataStream API [2]). >> > > > > > >> > > > > > >> > > > > > Please find more details in the FLIP wiki document [1]. >> > > > > > Looking forward to your feedback. >> > > > > > >> > > > > > Regards, >> > > > > > Jeyhun >> > > > > > >> > > > > > [1] >> > > > > > >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-434%3A+Support+optimizations+for+pre-partitioned+data+sources >> >> > > > > > [2] >> > > > > > >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/ >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> >> >> >> -- >> >> Best, >> Benchao Li >> >>