Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-09 Thread Jeyhun Karimov
Hi Lincoln,

I think I was misunderstood.
My approach was not to use MiniBatchLocalGroupAggFunction directly but use
the similar approach to it.
Currently, local and global aggregate functions are used together in query
plans.
In my quick PoC, I verified that my modified version of
MiniBatchLocalGroupAggFunction (used without timers) can be achieved
without global aggregation, just a different implementation of
MapBundleFunction.
So, we are not using global aggregate function (that depends on keyed
state) at all.

The very general idea is similar to the pre-shuffle pre-aggregation [1]. In
our case,  we just utilize pre-aggregation part and no shuffle after that.

I am also ok with scoping the FLIP for batch scenarios first, and create PR
for streaming scenarios (since streaming implementation does not change
public interfaces) afterwards.
WDYT?

[1] https://github.com/TU-Berlin-DIMA/AdCom





On Tue, Apr 9, 2024 at 5:34 PM Lincoln Lee  wrote:

> Thanks Jeyhun for your reply!
>
> Unfortunately, MiniBatchLocalGroupAggFunction only works for local agg
> in two-phase aggregation, while global aggregation (which is actually
> handled
> by the KeyedMapBundleOperator) still relies on the KeyedStream, meaning
> that consistency of the partitioner and state key selector is still
> required.
>
> Best,
> Lincoln Lee
>
>
> Jeyhun Karimov  于2024年4月6日周六 05:11写道:
>
> > Hi Lincoln,
> >
> > I did a bit of analysis on small PoC.
> > Please find my comments below:
> >
> > - In general, current design supports streaming workloads. However, as
> you
> > mentioned it comes with some (implementation-related) difficulties.
> > One of them (as you also mentioned) is that most of the operators utilize
> > keyed functions (e.g., Aggregate functions).
> > As a result, we cannot directly, utilize these operators (e.g.,
> > StreamPhysicalGroupbyAggregate) because they work on keyed inputs and
> their
> > tasks
> > utilize specific keyGroupRange.
> >
> > - As I mentioned above, my idea is to utilize similar approach
> > to MiniBatchLocalGroupAggFunction that is not time based and supports
> also
> > retractions.
> > The existing implementation of this function already supports quite a big
> > part of the scope. With this implementation, we utilize MapbundleFunction
> > that is not bound to a specific key range.
> >
> > - As the next milestone, more generic optimization is required that
> > introduces 1) new streaming distribution type as KEEP_INPUT_AS_IS,
> > 2) utilization of a ForwardHashExchangeProcessor, 3) corresponding
> chaining
> > strategy
> >
> > Currently, the plan is to first support this FLIP for batch workloads
> > (e.g., files, pre-divided data and buckets). Next, support for streaming
> > workloads.
> >
> > I hope I have answered your question.
> >
> > Regards,
> > Jeyhun
> >
> > On Wed, Apr 3, 2024 at 4:33 PM Lincoln Lee 
> wrote:
> >
> > > Hi Jeyhun,
> > >
> > > Thanks for your quick response!
> > >
> > > In streaming scenario, shuffle commonly occurs before the stateful
> > > operator, and there's a sanity check[1] when the stateful operator
> > > accesses the state. This implies the consistency requirement of the
> > > partitioner used for data shuffling and state key selector for state
> > > accessing(see KeyGroupStreamPartitioner for more details),
> > > otherwise, there may be state access errors. That is to say, in the
> > > streaming scenario, it is not only the strict requirement described in
> > > FlinkRelDistribution#requireStrict, but also the implied consistency of
> > > hash calculation.
> > >
> > > Also, if this flip targets both streaming and batch scenarios, it is
> > > recommended to do PoC validation for streaming as well.
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-29430
> > >
> > >
> > > Best,
> > > Lincoln Lee
> > >
> > >
> > > Leonard Xu  于2024年4月3日周三 14:25写道:
> > >
> > > > Hey, Jeyhun
> > > >
> > > > Thanks for kicking off this discussion. I have two questions about
> > > > streaming sources:
> > > >
> > > > (1)The FLIP  motivation section says Kafka broker is already
> > partitioned
> > > > w.r.t. some key[s] , Is this the main use case in Kafka world?
> > > Partitioning
> > > > by key fields is not the default partitioner of Kafka default
> > > > partitioner[1] IIUC.
> > > >
> > > > (2) Considering the FLIP’s optimization scope aims to both Batch and
> > > > Streaming pre-partitioned source, could you add a Streaming Source
> > > example
> > > > to help me understand the  FLIP better? I think Kafka Source is a
> good
> > > > candidates for streaming source example, file source is a good one
> for
> > > > batch source and it really helped me to follow-up the FLIP.
> > > >
> > > > Best,
> > > > Leonard
> > > > [1]
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L31
> > > >
> > > >
> > > >
> > > > > 2024年4月3日 上午5:53,Jeyhun Karimov  写道:
> > > > >
> > > > > Hi Lincoln,
> > > > >

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-09 Thread Lincoln Lee
Thanks Jeyhun for your reply!

Unfortunately, MiniBatchLocalGroupAggFunction only works for local agg
in two-phase aggregation, while global aggregation (which is actually
handled
by the KeyedMapBundleOperator) still relies on the KeyedStream, meaning
that consistency of the partitioner and state key selector is still
required.

Best,
Lincoln Lee


Jeyhun Karimov  于2024年4月6日周六 05:11写道:

> Hi Lincoln,
>
> I did a bit of analysis on small PoC.
> Please find my comments below:
>
> - In general, current design supports streaming workloads. However, as you
> mentioned it comes with some (implementation-related) difficulties.
> One of them (as you also mentioned) is that most of the operators utilize
> keyed functions (e.g., Aggregate functions).
> As a result, we cannot directly, utilize these operators (e.g.,
> StreamPhysicalGroupbyAggregate) because they work on keyed inputs and their
> tasks
> utilize specific keyGroupRange.
>
> - As I mentioned above, my idea is to utilize similar approach
> to MiniBatchLocalGroupAggFunction that is not time based and supports also
> retractions.
> The existing implementation of this function already supports quite a big
> part of the scope. With this implementation, we utilize MapbundleFunction
> that is not bound to a specific key range.
>
> - As the next milestone, more generic optimization is required that
> introduces 1) new streaming distribution type as KEEP_INPUT_AS_IS,
> 2) utilization of a ForwardHashExchangeProcessor, 3) corresponding chaining
> strategy
>
> Currently, the plan is to first support this FLIP for batch workloads
> (e.g., files, pre-divided data and buckets). Next, support for streaming
> workloads.
>
> I hope I have answered your question.
>
> Regards,
> Jeyhun
>
> On Wed, Apr 3, 2024 at 4:33 PM Lincoln Lee  wrote:
>
> > Hi Jeyhun,
> >
> > Thanks for your quick response!
> >
> > In streaming scenario, shuffle commonly occurs before the stateful
> > operator, and there's a sanity check[1] when the stateful operator
> > accesses the state. This implies the consistency requirement of the
> > partitioner used for data shuffling and state key selector for state
> > accessing(see KeyGroupStreamPartitioner for more details),
> > otherwise, there may be state access errors. That is to say, in the
> > streaming scenario, it is not only the strict requirement described in
> > FlinkRelDistribution#requireStrict, but also the implied consistency of
> > hash calculation.
> >
> > Also, if this flip targets both streaming and batch scenarios, it is
> > recommended to do PoC validation for streaming as well.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-29430
> >
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Leonard Xu  于2024年4月3日周三 14:25写道:
> >
> > > Hey, Jeyhun
> > >
> > > Thanks for kicking off this discussion. I have two questions about
> > > streaming sources:
> > >
> > > (1)The FLIP  motivation section says Kafka broker is already
> partitioned
> > > w.r.t. some key[s] , Is this the main use case in Kafka world?
> > Partitioning
> > > by key fields is not the default partitioner of Kafka default
> > > partitioner[1] IIUC.
> > >
> > > (2) Considering the FLIP’s optimization scope aims to both Batch and
> > > Streaming pre-partitioned source, could you add a Streaming Source
> > example
> > > to help me understand the  FLIP better? I think Kafka Source is a good
> > > candidates for streaming source example, file source is a good one for
> > > batch source and it really helped me to follow-up the FLIP.
> > >
> > > Best,
> > > Leonard
> > > [1]
> > >
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L31
> > >
> > >
> > >
> > > > 2024年4月3日 上午5:53,Jeyhun Karimov  写道:
> > > >
> > > > Hi Lincoln,
> > > >
> > > > Thanks a lot for your comments. Please find my answers below.
> > > >
> > > >
> > > > 1. Is this flip targeted only at batch scenarios or does it include
> > > >> streaming?
> > > >> (The flip and the discussion did not explicitly mention this, but in
> > the
> > > >> draft pr, I only
> > > >> saw the implementation for batch scenarios
> > > >>
> > > >>
> > >
> >
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8
> > > >> <
> > > >>
> > >
> >
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89
> > > >>>
> > > >> )
> > > >> If we expect this also apply to streaming, then we need to consider
> > the
> > > >> stricter
> > > >> shuffle restrictions of streaming compared to batch (if support is
> > > >> considered,
> > > >> more discussion is needed here, let’s not expand for now). If it
> only
> > > >> applies to batch,
> > > >> it is recommended to clarify in the flip.
> > > >
> > > >
> > > > - The FLIP targets both streaming and batch scenarios.
> > > > Could you please elaborate more on what you mean by additional
> > 

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-05 Thread Jeyhun Karimov
Hi Lincoln,

I did a bit of analysis on small PoC.
Please find my comments below:

- In general, current design supports streaming workloads. However, as you
mentioned it comes with some (implementation-related) difficulties.
One of them (as you also mentioned) is that most of the operators utilize
keyed functions (e.g., Aggregate functions).
As a result, we cannot directly, utilize these operators (e.g.,
StreamPhysicalGroupbyAggregate) because they work on keyed inputs and their
tasks
utilize specific keyGroupRange.

- As I mentioned above, my idea is to utilize similar approach
to MiniBatchLocalGroupAggFunction that is not time based and supports also
retractions.
The existing implementation of this function already supports quite a big
part of the scope. With this implementation, we utilize MapbundleFunction
that is not bound to a specific key range.

- As the next milestone, more generic optimization is required that
introduces 1) new streaming distribution type as KEEP_INPUT_AS_IS,
2) utilization of a ForwardHashExchangeProcessor, 3) corresponding chaining
strategy

Currently, the plan is to first support this FLIP for batch workloads
(e.g., files, pre-divided data and buckets). Next, support for streaming
workloads.

I hope I have answered your question.

Regards,
Jeyhun

On Wed, Apr 3, 2024 at 4:33 PM Lincoln Lee  wrote:

> Hi Jeyhun,
>
> Thanks for your quick response!
>
> In streaming scenario, shuffle commonly occurs before the stateful
> operator, and there's a sanity check[1] when the stateful operator
> accesses the state. This implies the consistency requirement of the
> partitioner used for data shuffling and state key selector for state
> accessing(see KeyGroupStreamPartitioner for more details),
> otherwise, there may be state access errors. That is to say, in the
> streaming scenario, it is not only the strict requirement described in
> FlinkRelDistribution#requireStrict, but also the implied consistency of
> hash calculation.
>
> Also, if this flip targets both streaming and batch scenarios, it is
> recommended to do PoC validation for streaming as well.
>
> [1] https://issues.apache.org/jira/browse/FLINK-29430
>
>
> Best,
> Lincoln Lee
>
>
> Leonard Xu  于2024年4月3日周三 14:25写道:
>
> > Hey, Jeyhun
> >
> > Thanks for kicking off this discussion. I have two questions about
> > streaming sources:
> >
> > (1)The FLIP  motivation section says Kafka broker is already partitioned
> > w.r.t. some key[s] , Is this the main use case in Kafka world?
> Partitioning
> > by key fields is not the default partitioner of Kafka default
> > partitioner[1] IIUC.
> >
> > (2) Considering the FLIP’s optimization scope aims to both Batch and
> > Streaming pre-partitioned source, could you add a Streaming Source
> example
> > to help me understand the  FLIP better? I think Kafka Source is a good
> > candidates for streaming source example, file source is a good one for
> > batch source and it really helped me to follow-up the FLIP.
> >
> > Best,
> > Leonard
> > [1]
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L31
> >
> >
> >
> > > 2024年4月3日 上午5:53,Jeyhun Karimov  写道:
> > >
> > > Hi Lincoln,
> > >
> > > Thanks a lot for your comments. Please find my answers below.
> > >
> > >
> > > 1. Is this flip targeted only at batch scenarios or does it include
> > >> streaming?
> > >> (The flip and the discussion did not explicitly mention this, but in
> the
> > >> draft pr, I only
> > >> saw the implementation for batch scenarios
> > >>
> > >>
> >
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8
> > >> <
> > >>
> >
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89
> > >>>
> > >> )
> > >> If we expect this also apply to streaming, then we need to consider
> the
> > >> stricter
> > >> shuffle restrictions of streaming compared to batch (if support is
> > >> considered,
> > >> more discussion is needed here, let’s not expand for now). If it only
> > >> applies to batch,
> > >> it is recommended to clarify in the flip.
> > >
> > >
> > > - The FLIP targets both streaming and batch scenarios.
> > > Could you please elaborate more on what you mean by additional
> > > restrictions?
> > >
> > >
> > > 2. In the current implementation, the optimized plan seems to have some
> > >> problems.
> > >> As described in the class comments:
> > >>
> > >>
> >
> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantShuffleRule.java#L60
> > >
> > > BatchPhysicalHashAggregate (local)
> > >
> > >   +- BatchPhysicalLocalHashAggregate (local)
> > >>  +- BatchPhysicalTableSourceScan
> > >> The `BatchPhysicalLocalHashAggregate` here is redundant (in the case
> of
> > >> 

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-05 Thread Jeyhun Karimov
Hi Leonard,

I did a bit of analysis on possible integration of this FLIP with Kafka
source. Please find the sample code and explanations below:


public class KafkaDynamicSource implements ScanTableSource,
SupportsReadingMetadata, SupportsWatermarkPushDown,
SupportsPartitioning
{

   // other parts of the codebase


   /** Returns a list of data units in each partition. */
   Optional> sourcePartitions() {
// Note that here, our "data unit" is TopicPartition. If we
read from multiple topics, then partitioned-read is not possible.
// Otherwise, we derive all partitions for the topic.

   if (topics.size() > 1 || topics.empty()) {
// Flink will fallback to the non-partitioned read
return Optional.Empty();
   }
   return deriveTopicPartitions(getTopicMetadata(adminClient,
new HashSet<>(topics.get(0;
}


void applyPartitionedRead() {
// that this has no impact in Kafka source
}
}


Since in Kafka source each topic partition can be consumed by only one
Flink source instance, things become easier (than File source for example).
Note that, mapping each source partitions to exactly one Flink source is
the main target of this FLIP. This is by design supported in Kafka source.
One another thing to mention is that, we need to make sure is that the
mapping (between Kafka partitions and Flink source instances)
is always deterministic under the same Kafka configuration.

Hope that answers your question.

Regards,
Jeyhun




On Wed, Apr 3, 2024 at 6:53 PM Jeyhun Karimov  wrote:

> Hi Leonard,
>
> Thanks a lot for your comments. Please find my answers below:
>
> (1)The FLIP  motivation section says Kafka broker is already partitioned
>> w.r.t. some key[s] , Is this the main use case in Kafka world? Partitioning
>> by key fields is not the default partitioner of Kafka default
>> partitioner[1] IIUC.
>
>
> - I see your point. The main usecase for Kafka is for ksqlDB users.
> Consider a ksqlDB query [1]
>
> CREATE STREAM products_rekeyed
>   WITH (PARTITIONS=6) AS
>   SELECT *
>FROM products
>PARTITION BY product_id;
>
> In this query, the output is re-partitioned to be keyed by product_id. In
> fact, as the documentation [1] (also Jim mentioned above) mentions, this
> repartitioning is a requirement for join queries.
>
>
> (2) Considering the FLIP’s optimization scope aims to both Batch and
>> Streaming pre-partitioned source, could you add a Streaming Source example
>> to help me understand the  FLIP better? I think Kafka Source is a good
>> candidates for streaming source example, file source is a good one for
>> batch source and it really helped me to follow-up the FLIP.
>
>
> - Currently, I do not have Kafka Source sample integration with this FLIP.
> My idea was to integrate first to the Flink main repo (e.g., file source in
> streaming & batch mode) and then to external connectors.
> But I can try with Kafka Source and get back.
>
> Regards,
> Jeyhun
>
> [1] https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/
>
> On Wed, Apr 3, 2024 at 8:25 AM Leonard Xu  wrote:
>
>> Hey, Jeyhun
>>
>> Thanks for kicking off this discussion. I have two questions about
>> streaming sources:
>>
>> (1)The FLIP  motivation section says Kafka broker is already partitioned
>> w.r.t. some key[s] , Is this the main use case in Kafka world? Partitioning
>> by key fields is not the default partitioner of Kafka default
>> partitioner[1] IIUC.
>>
>> (2) Considering the FLIP’s optimization scope aims to both Batch and
>> Streaming pre-partitioned source, could you add a Streaming Source example
>> to help me understand the  FLIP better? I think Kafka Source is a good
>> candidates for streaming source example, file source is a good one for
>> batch source and it really helped me to follow-up the FLIP.
>>
>> Best,
>> Leonard
>> [1]
>> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L31
>>
>>
>>
>> > 2024年4月3日 上午5:53,Jeyhun Karimov  写道:
>> >
>> > Hi Lincoln,
>> >
>> > Thanks a lot for your comments. Please find my answers below.
>> >
>> >
>> > 1. Is this flip targeted only at batch scenarios or does it include
>> >> streaming?
>> >> (The flip and the discussion did not explicitly mention this, but in
>> the
>> >> draft pr, I only
>> >> saw the implementation for batch scenarios
>> >>
>> >>
>> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8
>> >> <
>> >>
>> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89
>> >>>
>> >> )
>> >> If we expect this also apply to streaming, then we need to consider the
>> >> stricter
>> >> shuffle restrictions of streaming compared to batch (if support is
>> >> considered,
>> >> more discussion is needed 

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-03 Thread Jeyhun Karimov
Hi Lincoln,

Thanks for your reply. My idea was to utilize MapBundleFunction as it was
already used in a similar context - MiniBatchLocalGroupAggFunction.
I can also extend my PoC for streaming sources and get back to continue our
discussion.

Regards,
Jeyhun

On Wed, Apr 3, 2024 at 4:33 PM Lincoln Lee  wrote:

> Hi Jeyhun,
>
> Thanks for your quick response!
>
> In streaming scenario, shuffle commonly occurs before the stateful
> operator, and there's a sanity check[1] when the stateful operator
> accesses the state. This implies the consistency requirement of the
> partitioner used for data shuffling and state key selector for state
> accessing(see KeyGroupStreamPartitioner for more details),
> otherwise, there may be state access errors. That is to say, in the
> streaming scenario, it is not only the strict requirement described in
> FlinkRelDistribution#requireStrict, but also the implied consistency of
> hash calculation.
>
> Also, if this flip targets both streaming and batch scenarios, it is
> recommended to do PoC validation for streaming as well.
>
> [1] https://issues.apache.org/jira/browse/FLINK-29430
>
>
> Best,
> Lincoln Lee
>
>
> Leonard Xu  于2024年4月3日周三 14:25写道:
>
> > Hey, Jeyhun
> >
> > Thanks for kicking off this discussion. I have two questions about
> > streaming sources:
> >
> > (1)The FLIP  motivation section says Kafka broker is already partitioned
> > w.r.t. some key[s] , Is this the main use case in Kafka world?
> Partitioning
> > by key fields is not the default partitioner of Kafka default
> > partitioner[1] IIUC.
> >
> > (2) Considering the FLIP’s optimization scope aims to both Batch and
> > Streaming pre-partitioned source, could you add a Streaming Source
> example
> > to help me understand the  FLIP better? I think Kafka Source is a good
> > candidates for streaming source example, file source is a good one for
> > batch source and it really helped me to follow-up the FLIP.
> >
> > Best,
> > Leonard
> > [1]
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L31
> >
> >
> >
> > > 2024年4月3日 上午5:53,Jeyhun Karimov  写道:
> > >
> > > Hi Lincoln,
> > >
> > > Thanks a lot for your comments. Please find my answers below.
> > >
> > >
> > > 1. Is this flip targeted only at batch scenarios or does it include
> > >> streaming?
> > >> (The flip and the discussion did not explicitly mention this, but in
> the
> > >> draft pr, I only
> > >> saw the implementation for batch scenarios
> > >>
> > >>
> >
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8
> > >> <
> > >>
> >
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89
> > >>>
> > >> )
> > >> If we expect this also apply to streaming, then we need to consider
> the
> > >> stricter
> > >> shuffle restrictions of streaming compared to batch (if support is
> > >> considered,
> > >> more discussion is needed here, let’s not expand for now). If it only
> > >> applies to batch,
> > >> it is recommended to clarify in the flip.
> > >
> > >
> > > - The FLIP targets both streaming and batch scenarios.
> > > Could you please elaborate more on what you mean by additional
> > > restrictions?
> > >
> > >
> > > 2. In the current implementation, the optimized plan seems to have some
> > >> problems.
> > >> As described in the class comments:
> > >>
> > >>
> >
> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantShuffleRule.java#L60
> > >
> > > BatchPhysicalHashAggregate (local)
> > >
> > >   +- BatchPhysicalLocalHashAggregate (local)
> > >>  +- BatchPhysicalTableSourceScan
> > >> The `BatchPhysicalLocalHashAggregate` here is redundant (in the case
> of
> > >> one-phase
> > >> hashAgg, localAgg is not necessary, which is the scenario currently
> > handled
> > >> by
> > >> `RemoveRedundantLocalHashAggRule` and other rules)
> > >
> > >
> > > - Yes, you are completely right. Note that the PR you referenced is
> just
> > a
> > > quick PoC.
> > > Redundant operators you mentioned exist because
> > > `RemoveRedundantShuffleRule` just removes the Exchange operator,
> > > without modifying upstream/downstream operators.
> > > As I mentioned, the implementation is just a PoC and the end
> > implementation
> > > will make sure that existing redundancy elimination rules remove
> > redundant
> > > operators.
> > >
> > >
> > > Also, in the draft pr,
> > >> the optimization of `testShouldEliminatePartitioning1` &
> > >> `testShouldEliminatePartitioning2`
> > >> seems didn't take effect?
> > >>
> > >>
> >
> 

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-03 Thread Jeyhun Karimov
Hi Leonard,

Thanks a lot for your comments. Please find my answers below:

(1)The FLIP  motivation section says Kafka broker is already partitioned
> w.r.t. some key[s] , Is this the main use case in Kafka world? Partitioning
> by key fields is not the default partitioner of Kafka default
> partitioner[1] IIUC.


- I see your point. The main usecase for Kafka is for ksqlDB users.
Consider a ksqlDB query [1]

CREATE STREAM products_rekeyed
  WITH (PARTITIONS=6) AS
  SELECT *
   FROM products
   PARTITION BY product_id;

In this query, the output is re-partitioned to be keyed by product_id. In
fact, as the documentation [1] (also Jim mentioned above) mentions, this
repartitioning is a requirement for join queries.


(2) Considering the FLIP’s optimization scope aims to both Batch and
> Streaming pre-partitioned source, could you add a Streaming Source example
> to help me understand the  FLIP better? I think Kafka Source is a good
> candidates for streaming source example, file source is a good one for
> batch source and it really helped me to follow-up the FLIP.


- Currently, I do not have Kafka Source sample integration with this FLIP.
My idea was to integrate first to the Flink main repo (e.g., file source in
streaming & batch mode) and then to external connectors.
But I can try with Kafka Source and get back.

Regards,
Jeyhun

[1] https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/

On Wed, Apr 3, 2024 at 8:25 AM Leonard Xu  wrote:

> Hey, Jeyhun
>
> Thanks for kicking off this discussion. I have two questions about
> streaming sources:
>
> (1)The FLIP  motivation section says Kafka broker is already partitioned
> w.r.t. some key[s] , Is this the main use case in Kafka world? Partitioning
> by key fields is not the default partitioner of Kafka default
> partitioner[1] IIUC.
>
> (2) Considering the FLIP’s optimization scope aims to both Batch and
> Streaming pre-partitioned source, could you add a Streaming Source example
> to help me understand the  FLIP better? I think Kafka Source is a good
> candidates for streaming source example, file source is a good one for
> batch source and it really helped me to follow-up the FLIP.
>
> Best,
> Leonard
> [1]
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L31
>
>
>
> > 2024年4月3日 上午5:53,Jeyhun Karimov  写道:
> >
> > Hi Lincoln,
> >
> > Thanks a lot for your comments. Please find my answers below.
> >
> >
> > 1. Is this flip targeted only at batch scenarios or does it include
> >> streaming?
> >> (The flip and the discussion did not explicitly mention this, but in the
> >> draft pr, I only
> >> saw the implementation for batch scenarios
> >>
> >>
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8
> >> <
> >>
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89
> >>>
> >> )
> >> If we expect this also apply to streaming, then we need to consider the
> >> stricter
> >> shuffle restrictions of streaming compared to batch (if support is
> >> considered,
> >> more discussion is needed here, let’s not expand for now). If it only
> >> applies to batch,
> >> it is recommended to clarify in the flip.
> >
> >
> > - The FLIP targets both streaming and batch scenarios.
> > Could you please elaborate more on what you mean by additional
> > restrictions?
> >
> >
> > 2. In the current implementation, the optimized plan seems to have some
> >> problems.
> >> As described in the class comments:
> >>
> >>
> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantShuffleRule.java#L60
> >
> > BatchPhysicalHashAggregate (local)
> >
> >   +- BatchPhysicalLocalHashAggregate (local)
> >>  +- BatchPhysicalTableSourceScan
> >> The `BatchPhysicalLocalHashAggregate` here is redundant (in the case of
> >> one-phase
> >> hashAgg, localAgg is not necessary, which is the scenario currently
> handled
> >> by
> >> `RemoveRedundantLocalHashAggRule` and other rules)
> >
> >
> > - Yes, you are completely right. Note that the PR you referenced is just
> a
> > quick PoC.
> > Redundant operators you mentioned exist because
> > `RemoveRedundantShuffleRule` just removes the Exchange operator,
> > without modifying upstream/downstream operators.
> > As I mentioned, the implementation is just a PoC and the end
> implementation
> > will make sure that existing redundancy elimination rules remove
> redundant
> > operators.
> >
> >
> > Also, in the draft pr,
> >> the optimization of `testShouldEliminatePartitioning1` &
> >> `testShouldEliminatePartitioning2`
> >> seems didn't take effect?
> >>
> >>
> 

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-03 Thread Lincoln Lee
Hi Jeyhun,

Thanks for your quick response!

In streaming scenario, shuffle commonly occurs before the stateful
operator, and there's a sanity check[1] when the stateful operator
accesses the state. This implies the consistency requirement of the
partitioner used for data shuffling and state key selector for state
accessing(see KeyGroupStreamPartitioner for more details),
otherwise, there may be state access errors. That is to say, in the
streaming scenario, it is not only the strict requirement described in
FlinkRelDistribution#requireStrict, but also the implied consistency of
hash calculation.

Also, if this flip targets both streaming and batch scenarios, it is
recommended to do PoC validation for streaming as well.

[1] https://issues.apache.org/jira/browse/FLINK-29430


Best,
Lincoln Lee


Leonard Xu  于2024年4月3日周三 14:25写道:

> Hey, Jeyhun
>
> Thanks for kicking off this discussion. I have two questions about
> streaming sources:
>
> (1)The FLIP  motivation section says Kafka broker is already partitioned
> w.r.t. some key[s] , Is this the main use case in Kafka world? Partitioning
> by key fields is not the default partitioner of Kafka default
> partitioner[1] IIUC.
>
> (2) Considering the FLIP’s optimization scope aims to both Batch and
> Streaming pre-partitioned source, could you add a Streaming Source example
> to help me understand the  FLIP better? I think Kafka Source is a good
> candidates for streaming source example, file source is a good one for
> batch source and it really helped me to follow-up the FLIP.
>
> Best,
> Leonard
> [1]
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L31
>
>
>
> > 2024年4月3日 上午5:53,Jeyhun Karimov  写道:
> >
> > Hi Lincoln,
> >
> > Thanks a lot for your comments. Please find my answers below.
> >
> >
> > 1. Is this flip targeted only at batch scenarios or does it include
> >> streaming?
> >> (The flip and the discussion did not explicitly mention this, but in the
> >> draft pr, I only
> >> saw the implementation for batch scenarios
> >>
> >>
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8
> >> <
> >>
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89
> >>>
> >> )
> >> If we expect this also apply to streaming, then we need to consider the
> >> stricter
> >> shuffle restrictions of streaming compared to batch (if support is
> >> considered,
> >> more discussion is needed here, let’s not expand for now). If it only
> >> applies to batch,
> >> it is recommended to clarify in the flip.
> >
> >
> > - The FLIP targets both streaming and batch scenarios.
> > Could you please elaborate more on what you mean by additional
> > restrictions?
> >
> >
> > 2. In the current implementation, the optimized plan seems to have some
> >> problems.
> >> As described in the class comments:
> >>
> >>
> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantShuffleRule.java#L60
> >
> > BatchPhysicalHashAggregate (local)
> >
> >   +- BatchPhysicalLocalHashAggregate (local)
> >>  +- BatchPhysicalTableSourceScan
> >> The `BatchPhysicalLocalHashAggregate` here is redundant (in the case of
> >> one-phase
> >> hashAgg, localAgg is not necessary, which is the scenario currently
> handled
> >> by
> >> `RemoveRedundantLocalHashAggRule` and other rules)
> >
> >
> > - Yes, you are completely right. Note that the PR you referenced is just
> a
> > quick PoC.
> > Redundant operators you mentioned exist because
> > `RemoveRedundantShuffleRule` just removes the Exchange operator,
> > without modifying upstream/downstream operators.
> > As I mentioned, the implementation is just a PoC and the end
> implementation
> > will make sure that existing redundancy elimination rules remove
> redundant
> > operators.
> >
> >
> > Also, in the draft pr,
> >> the optimization of `testShouldEliminatePartitioning1` &
> >> `testShouldEliminatePartitioning2`
> >> seems didn't take effect?
> >>
> >>
> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/BatchFileSystemTableSourceTest.xml#L38
> >
> >
> > -  Note that in this example, Exchange operator have a
> > property KEEP_INPUT_AS_IS that indicates that data distribution is the
> same
> > as its input.
> > Since we have redundant operators (as shown above, two aggregate
> operators)
> > one of the rules (not in this FLIP)
> > adds this Exchange operator with KEEP_INPUT_AS_IS in between.
> > Similar to my comment above, the end implementation will be except from
> > redundant operators.
> >
> > In conjunction with question 2, I am wondering if we have a better choice
> >> (of 

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-03 Thread Leonard Xu
Hey, Jeyhun 

Thanks for kicking off this discussion. I have two questions about streaming 
sources:

(1)The FLIP  motivation section says Kafka broker is already partitioned w.r.t. 
some key[s] , Is this the main use case in Kafka world? Partitioning by key 
fields is not the default partitioner of Kafka default partitioner[1] IIUC.

(2) Considering the FLIP’s optimization scope aims to both Batch and Streaming 
pre-partitioned source, could you add a Streaming Source example to help me 
understand the  FLIP better? I think Kafka Source is a good candidates for 
streaming source example, file source is a good one for batch source and it 
really helped me to follow-up the FLIP.

Best,
Leonard
[1]https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L31



> 2024年4月3日 上午5:53,Jeyhun Karimov  写道:
> 
> Hi Lincoln,
> 
> Thanks a lot for your comments. Please find my answers below.
> 
> 
> 1. Is this flip targeted only at batch scenarios or does it include
>> streaming?
>> (The flip and the discussion did not explicitly mention this, but in the
>> draft pr, I only
>> saw the implementation for batch scenarios
>> 
>> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8
>> <
>> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89
>>> 
>> )
>> If we expect this also apply to streaming, then we need to consider the
>> stricter
>> shuffle restrictions of streaming compared to batch (if support is
>> considered,
>> more discussion is needed here, let’s not expand for now). If it only
>> applies to batch,
>> it is recommended to clarify in the flip.
> 
> 
> - The FLIP targets both streaming and batch scenarios.
> Could you please elaborate more on what you mean by additional
> restrictions?
> 
> 
> 2. In the current implementation, the optimized plan seems to have some
>> problems.
>> As described in the class comments:
>> 
>> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantShuffleRule.java#L60
> 
> BatchPhysicalHashAggregate (local)
> 
>   +- BatchPhysicalLocalHashAggregate (local)
>>  +- BatchPhysicalTableSourceScan
>> The `BatchPhysicalLocalHashAggregate` here is redundant (in the case of
>> one-phase
>> hashAgg, localAgg is not necessary, which is the scenario currently handled
>> by
>> `RemoveRedundantLocalHashAggRule` and other rules)
> 
> 
> - Yes, you are completely right. Note that the PR you referenced is just a
> quick PoC.
> Redundant operators you mentioned exist because
> `RemoveRedundantShuffleRule` just removes the Exchange operator,
> without modifying upstream/downstream operators.
> As I mentioned, the implementation is just a PoC and the end implementation
> will make sure that existing redundancy elimination rules remove redundant
> operators.
> 
> 
> Also, in the draft pr,
>> the optimization of `testShouldEliminatePartitioning1` &
>> `testShouldEliminatePartitioning2`
>> seems didn't take effect?
>> 
>> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/BatchFileSystemTableSourceTest.xml#L38
> 
> 
> -  Note that in this example, Exchange operator have a
> property KEEP_INPUT_AS_IS that indicates that data distribution is the same
> as its input.
> Since we have redundant operators (as shown above, two aggregate operators)
> one of the rules (not in this FLIP)
> adds this Exchange operator with KEEP_INPUT_AS_IS in between.
> Similar to my comment above, the end implementation will be except from
> redundant operators.
> 
> In conjunction with question 2, I am wondering if we have a better choice
>> (of course, not simply adding the current `PHYSICAL_OPT_RULES`'s
>> `RemoveRedundantLocalXXRule`s
>> to the `PHYSICAL_REWRITE`).
>> For example, let the source actively provide some traits (including
>> `FlinkRelDistribution`
>> and `RelCollation`) to the planner. The advantage of doing this is to
>> directly reuse the
>> current shuffle remove optimization (as `FlinkExpandConversionRule`
>> implemented),
>> and according to the data distribution characteristics provided by the
>> source, the planner
>> may choose a physical operator with a cheaper costs (for example, according
>> to `RelCollation`,
>> the planner can use sortAgg, no need for a separate local sort operation).
>> WDYT?
> 
> 
> - Good point. Makes sense to me. I will check FlinkExpandConversionRule to
> be utilized in the implementation.
> 
> 
> Regards,
> Jeyhun
> 
> 
> 
> On Tue, Apr 2, 2024 at 6:01 PM Lincoln Lee  wrote:
> 
>> Hi Jeyhun,
>> 
>> Thank you for driving this, it would be very useful optimization!
>> 
>> Sorry for joining the discussion now(I 

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-02 Thread Jeyhun Karimov
Hi Lincoln,

Thanks a lot for your comments. Please find my answers below.


1. Is this flip targeted only at batch scenarios or does it include
> streaming?
> (The flip and the discussion did not explicitly mention this, but in the
> draft pr, I only
> saw the implementation for batch scenarios
>
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8
> <
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89
> >
>  )
> If we expect this also apply to streaming, then we need to consider the
> stricter
> shuffle restrictions of streaming compared to batch (if support is
> considered,
> more discussion is needed here, let’s not expand for now). If it only
> applies to batch,
> it is recommended to clarify in the flip.


- The FLIP targets both streaming and batch scenarios.
 Could you please elaborate more on what you mean by additional
restrictions?


2. In the current implementation, the optimized plan seems to have some
> problems.
> As described in the class comments:
>
> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantShuffleRule.java#L60

BatchPhysicalHashAggregate (local)

   +- BatchPhysicalLocalHashAggregate (local)
>   +- BatchPhysicalTableSourceScan
> The `BatchPhysicalLocalHashAggregate` here is redundant (in the case of
> one-phase
> hashAgg, localAgg is not necessary, which is the scenario currently handled
> by
> `RemoveRedundantLocalHashAggRule` and other rules)


- Yes, you are completely right. Note that the PR you referenced is just a
quick PoC.
Redundant operators you mentioned exist because
`RemoveRedundantShuffleRule` just removes the Exchange operator,
without modifying upstream/downstream operators.
As I mentioned, the implementation is just a PoC and the end implementation
will make sure that existing redundancy elimination rules remove redundant
operators.


Also, in the draft pr,
> the optimization of `testShouldEliminatePartitioning1` &
> `testShouldEliminatePartitioning2`
> seems didn't take effect?
>
> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/BatchFileSystemTableSourceTest.xml#L38


-  Note that in this example, Exchange operator have a
property KEEP_INPUT_AS_IS that indicates that data distribution is the same
as its input.
Since we have redundant operators (as shown above, two aggregate operators)
one of the rules (not in this FLIP)
adds this Exchange operator with KEEP_INPUT_AS_IS in between.
Similar to my comment above, the end implementation will be except from
redundant operators.

 In conjunction with question 2, I am wondering if we have a better choice
> (of course, not simply adding the current `PHYSICAL_OPT_RULES`'s
> `RemoveRedundantLocalXXRule`s
> to the `PHYSICAL_REWRITE`).
> For example, let the source actively provide some traits (including
> `FlinkRelDistribution`
> and `RelCollation`) to the planner. The advantage of doing this is to
> directly reuse the
> current shuffle remove optimization (as `FlinkExpandConversionRule`
> implemented),
> and according to the data distribution characteristics provided by the
> source, the planner
> may choose a physical operator with a cheaper costs (for example, according
> to `RelCollation`,
> the planner can use sortAgg, no need for a separate local sort operation).
> WDYT?


- Good point. Makes sense to me. I will check FlinkExpandConversionRule to
be utilized in the implementation.


Regards,
Jeyhun



On Tue, Apr 2, 2024 at 6:01 PM Lincoln Lee  wrote:

> Hi Jeyhun,
>
> Thank you for driving this, it would be very useful optimization!
>
> Sorry for joining the discussion now(I originally planned to reply earlier,
> but
> happened to be during my vacation). I have two questions:
>
> 1. Is this flip targeted only at batch scenarios or does it include
> streaming?
> (The flip and the discussion did not explicitly mention this, but in the
> draft pr, I only
> saw the implementation for batch scenarios
>
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8
> <
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89
> >
>  )
> If we expect this also apply to streaming, then we need to consider the
> stricter
> shuffle restrictions of streaming compared to batch (if support is
> considered,
> more discussion is needed here, let’s not expand for now). If it only
> applies to batch,
> it is recommended to clarify in the flip.
>
> 2. In the current implementation, the optimized plan seems to have some
> problems.
> As described in the class comments:
>
> 

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-02 Thread Lincoln Lee
Hi Jeyhun,

Thank you for driving this, it would be very useful optimization!

Sorry for joining the discussion now(I originally planned to reply earlier,
but
happened to be during my vacation). I have two questions:

1. Is this flip targeted only at batch scenarios or does it include
streaming?
(The flip and the discussion did not explicitly mention this, but in the
draft pr, I only
saw the implementation for batch scenarios
https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8

 )
If we expect this also apply to streaming, then we need to consider the
stricter
shuffle restrictions of streaming compared to batch (if support is
considered,
more discussion is needed here, let’s not expand for now). If it only
applies to batch,
it is recommended to clarify in the flip.

2. In the current implementation, the optimized plan seems to have some
problems.
As described in the class comments:
https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantShuffleRule.java#L60

BatchPhysicalHashAggregate (local)

   +- BatchPhysicalLocalHashAggregate (local)

  +- BatchPhysicalTableSourceScan
The `BatchPhysicalLocalHashAggregate` here is redundant (in the case of
one-phase
hashAgg, localAgg is not necessary, which is the scenario currently handled
by
`RemoveRedundantLocalHashAggRule` and other rules).  Also, in the draft pr,
the
optimization of `testShouldEliminatePartitioning1` &
`testShouldEliminatePartitioning2`
seems didn't take effect?
https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/BatchFileSystemTableSourceTest.xml#L38

In conjunction with question 2, I am wondering if we have a better choice
(of course,
not simply adding the current `PHYSICAL_OPT_RULES`'s
`RemoveRedundantLocalXXRule`s
to the `PHYSICAL_REWRITE`).
For example, let the source actively provide some traits (including
`FlinkRelDistribution`
and `RelCollation`) to the planner. The advantage of doing this is to
directly reuse the
current shuffle remove optimization (as `FlinkExpandConversionRule`
implemented),
and according to the data distribution characteristics provided by the
source, the planner
may choose a physical operator with a cheaper costs (for example, according
to `RelCollation`,
the planner can use sortAgg, no need for a separate local sort operation).
WDYT?


Best,
Lincoln Lee


Jeyhun Karimov  于2024年4月1日周一 18:00写道:

> Hi everyone,
>
> Thanks for your valuable feedback!
>
> The discussion on this FLIP has been going on for a while.
> I would like to start a vote after 48 hours.
>
> Please let me know if you have any concerns or any further
> questions/comments.
>
> Regards,
> Jeyhun
>
>
> On Thu, Mar 21, 2024 at 6:01 PM Jeyhun Karimov 
> wrote:
>
> > Hi Lorenzo,
> >
> > Thanks a lot for your comments. Please find my answers below:
> >
> >
> > For the interface `SupportsPartitioning`, why returning `Optional`?
> >> If one decides to implement that, partitions must exist (at maximum,
> >> return and empty list). Returning `Optional` seem just to complicate the
> >> logic of the code using that interface.
> >
> >
> > - The reasoning behind the use of Optional is that sometimes (e.g., in
> > HiveTableSource) the partitioning info is in catalog.
> >   Therefore, we return Optional.empty(), so that the list of partitions
> is
> > queried from the catalog.
> >
> >
> > I foresee the using code doing something like: "if the source supports
> >> partitioning, get the partitions, but if they don't exist, raise a
> runtime
> >> exception". Let's simply make that safe at compile time and guarantee
> the
> >> code that partitions exist.
> >
> >
> > - Yes, once partitions cannot be found, neither from catalog nor from the
> > interface implementation, then we raise an exception during query compile
> > time.
> >
> >
> >  Another thing is that you show Hive-like partitioning in your FS
> >> structure, do you think it makes sense to add a note about
> auto-discovery
> >> of partitions?
> >
> >
> > - Yes, the FLIP contains just an example partitioning for filesystem
> > connector. Each connector already "knows" about autodiscovery of its
> > partitions. And we rely on this fact.
> >   For example, partition discovery is different between kafka and
> > filesystem sources. So, we do not handle the manual discovery of
> > partitions. Please correct me if I misunderstood your question.
> >
> >
> > In other terms, it looks a bit counterintuitive that the user
> implementing
> >> the source has to specify which partitions exist statically (and they
> can
> >> change at runtime), while the source itself knows the data 

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-01 Thread Jeyhun Karimov
Hi everyone,

Thanks for your valuable feedback!

The discussion on this FLIP has been going on for a while.
I would like to start a vote after 48 hours.

Please let me know if you have any concerns or any further
questions/comments.

Regards,
Jeyhun


On Thu, Mar 21, 2024 at 6:01 PM Jeyhun Karimov  wrote:

> Hi Lorenzo,
>
> Thanks a lot for your comments. Please find my answers below:
>
>
> For the interface `SupportsPartitioning`, why returning `Optional`?
>> If one decides to implement that, partitions must exist (at maximum,
>> return and empty list). Returning `Optional` seem just to complicate the
>> logic of the code using that interface.
>
>
> - The reasoning behind the use of Optional is that sometimes (e.g., in
> HiveTableSource) the partitioning info is in catalog.
>   Therefore, we return Optional.empty(), so that the list of partitions is
> queried from the catalog.
>
>
> I foresee the using code doing something like: "if the source supports
>> partitioning, get the partitions, but if they don't exist, raise a runtime
>> exception". Let's simply make that safe at compile time and guarantee the
>> code that partitions exist.
>
>
> - Yes, once partitions cannot be found, neither from catalog nor from the
> interface implementation, then we raise an exception during query compile
> time.
>
>
>  Another thing is that you show Hive-like partitioning in your FS
>> structure, do you think it makes sense to add a note about auto-discovery
>> of partitions?
>
>
> - Yes, the FLIP contains just an example partitioning for filesystem
> connector. Each connector already "knows" about autodiscovery of its
> partitions. And we rely on this fact.
>   For example, partition discovery is different between kafka and
> filesystem sources. So, we do not handle the manual discovery of
> partitions. Please correct me if I misunderstood your question.
>
>
> In other terms, it looks a bit counterintuitive that the user implementing
>> the source has to specify which partitions exist statically (and they can
>> change at runtime), while the source itself knows the data provider and can
>> directly implement a method `discoverPartitions`. Then Flink would take
>> care of invoking that method when needed.
>
>
> We utilize table option SOURCE_MONITOR_INTERVAL to check whether
> partitions are static or not. So, a user still should give Flink a hint
> about partitions being static or not. With static partitions Flink can do
> more optimizations.
>
> Please let me know if my replies answer your questions or if you have more
> comments.
>
> Regards,
> Jeyhun
>
>
>
> On Thu, Mar 21, 2024 at 10:03 AM  wrote:
>
>> Hello Jeyhun,
>> I really like the proposal and definitely makes sense to me.
>>
>> I have a couple of nits here and there:
>>
>> For the interface `SupportsPartitioning`, why returning `Optional`?
>> If one decides to implement that, partitions must exist (at maximum,
>> return and empty list). Returning `Optional` seem just to complicate the
>> logic of the code using that interface.
>>
>> I foresee the using code doing something like: "if the source supports
>> partitioning, get the partitions, but if they don't exist, raise a runtime
>> exception". Let's simply make that safe at compile time and guarantee the
>> code that partitions exist.
>>
>> Another thing is that you show Hive-like partitioning in your FS
>> structure, do you think it makes sense to add a note about auto-discovery
>> of partitions?
>>
>> In other terms, it looks a bit counterintuitive that the user
>> implementing the source has to specify which partitions exist statically
>> (and they can change at runtime), while the source itself knows the data
>> provider and can directly implement a method `discoverPartitions`. Then
>> Flink would take care of invoking that method when needed.
>> On Mar 15, 2024 at 22:09 +0100, Jeyhun Karimov ,
>> wrote:
>>
>> Hi Benchao,
>>
>> Thanks for your comments.
>>
>> 1. What the parallelism would you take? E.g., 128 + 256 => 128? What
>>
>> if we cannot have a good greatest common divisor, like 127 + 128,
>> could we just utilize one side's pre-partitioned attribute, and let
>> another side just do the shuffle?
>>
>>
>>
>> There are two cases we need to consider:
>>
>> 1. Static Partition (no partitions are added during the query execution)
>> is
>> enabled AND both sources implement "SupportsPartitionPushdown"
>>
>> In this case, we are sure that no new partitions will be added at runtime.
>> So, we have a chance equalize both sources' partitions and parallelism,
>> IFF
>> both sources implement "SupportsPartitionPushdown" interface.
>> To achieve so, first we will fetch the existing partitions from source1
>> (say p_s1) and from source2 (say p_s2).
>> Then, we find the intersection of these two partition sets (say
>> p_intersect) and pushdown these partitions:
>>
>> SupportsPartitionPushDown::applyPartitions(p_intersect) // make sure that
>> only specific partitions are read
>> 

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-21 Thread Jeyhun Karimov
Hi Lorenzo,

Thanks a lot for your comments. Please find my answers below:


For the interface `SupportsPartitioning`, why returning `Optional`?
> If one decides to implement that, partitions must exist (at maximum,
> return and empty list). Returning `Optional` seem just to complicate the
> logic of the code using that interface.


- The reasoning behind the use of Optional is that sometimes (e.g., in
HiveTableSource) the partitioning info is in catalog.
  Therefore, we return Optional.empty(), so that the list of partitions is
queried from the catalog.


I foresee the using code doing something like: "if the source supports
> partitioning, get the partitions, but if they don't exist, raise a runtime
> exception". Let's simply make that safe at compile time and guarantee the
> code that partitions exist.


- Yes, once partitions cannot be found, neither from catalog nor from the
interface implementation, then we raise an exception during query compile
time.


 Another thing is that you show Hive-like partitioning in your FS
> structure, do you think it makes sense to add a note about auto-discovery
> of partitions?


- Yes, the FLIP contains just an example partitioning for filesystem
connector. Each connector already "knows" about autodiscovery of its
partitions. And we rely on this fact.
  For example, partition discovery is different between kafka and
filesystem sources. So, we do not handle the manual discovery of
partitions. Please correct me if I misunderstood your question.


In other terms, it looks a bit counterintuitive that the user implementing
> the source has to specify which partitions exist statically (and they can
> change at runtime), while the source itself knows the data provider and can
> directly implement a method `discoverPartitions`. Then Flink would take
> care of invoking that method when needed.


We utilize table option SOURCE_MONITOR_INTERVAL to check whether partitions
are static or not. So, a user still should give Flink a hint about
partitions being static or not. With static partitions Flink can do more
optimizations.

Please let me know if my replies answer your questions or if you have more
comments.

Regards,
Jeyhun



On Thu, Mar 21, 2024 at 10:03 AM  wrote:

> Hello Jeyhun,
> I really like the proposal and definitely makes sense to me.
>
> I have a couple of nits here and there:
>
> For the interface `SupportsPartitioning`, why returning `Optional`?
> If one decides to implement that, partitions must exist (at maximum,
> return and empty list). Returning `Optional` seem just to complicate the
> logic of the code using that interface.
>
> I foresee the using code doing something like: "if the source supports
> partitioning, get the partitions, but if they don't exist, raise a runtime
> exception". Let's simply make that safe at compile time and guarantee the
> code that partitions exist.
>
> Another thing is that you show Hive-like partitioning in your FS
> structure, do you think it makes sense to add a note about auto-discovery
> of partitions?
>
> In other terms, it looks a bit counterintuitive that the user implementing
> the source has to specify which partitions exist statically (and they can
> change at runtime), while the source itself knows the data provider and can
> directly implement a method `discoverPartitions`. Then Flink would take
> care of invoking that method when needed.
> On Mar 15, 2024 at 22:09 +0100, Jeyhun Karimov ,
> wrote:
>
> Hi Benchao,
>
> Thanks for your comments.
>
> 1. What the parallelism would you take? E.g., 128 + 256 => 128? What
>
> if we cannot have a good greatest common divisor, like 127 + 128,
> could we just utilize one side's pre-partitioned attribute, and let
> another side just do the shuffle?
>
>
>
> There are two cases we need to consider:
>
> 1. Static Partition (no partitions are added during the query execution) is
> enabled AND both sources implement "SupportsPartitionPushdown"
>
> In this case, we are sure that no new partitions will be added at runtime.
> So, we have a chance equalize both sources' partitions and parallelism, IFF
> both sources implement "SupportsPartitionPushdown" interface.
> To achieve so, first we will fetch the existing partitions from source1
> (say p_s1) and from source2 (say p_s2).
> Then, we find the intersection of these two partition sets (say
> p_intersect) and pushdown these partitions:
>
> SupportsPartitionPushDown::applyPartitions(p_intersect) // make sure that
> only specific partitions are read
> SupportsPartitioning::applyPartitionedRead(p_intersect) // partitioned read
> with filtered partitions
>
> Lastly, we need to change the parallelism of 1) source1, 2) source2, and 3)
> all of their downstream operators until (and including) their first common
> ancestor (e.g., join) to be equal to the number of partitions (size of
> p_intersect).
>
> 2. All other cases
>
> In all other cases, the parallelism of both sources and their downstream
> operators until their common ancestor 

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-21 Thread Benchao Li
Jeyhun,

Sorry for the delay. And thanks for the explanation, it sounds good to me!

Jeyhun Karimov  于2024年3月16日周六 05:09写道:
>
> Hi Benchao,
>
> Thanks for your comments.
>
> 1. What the parallelism would you take? E.g., 128 + 256 => 128? What
> > if we cannot have a good greatest common divisor, like 127 + 128,
> > could we just utilize one side's pre-partitioned attribute, and let
> > another side just do the shuffle?
>
>
> There are two cases we need to consider:
>
> 1. Static Partition (no partitions are added during the query execution) is
> enabled AND both sources implement "SupportsPartitionPushdown"
>
> In this case, we are sure that no new partitions will be added at runtime.
> So, we have a chance equalize both sources' partitions and parallelism, IFF
> both sources implement "SupportsPartitionPushdown" interface.
> To achieve so, first we will fetch the existing partitions from source1
> (say p_s1) and from source2 (say p_s2).
> Then, we find the intersection of these two partition sets (say
> p_intersect) and pushdown these partitions:
>
> SupportsPartitionPushDown::applyPartitions(p_intersect) // make sure that
> only specific partitions are read
> SupportsPartitioning::applyPartitionedRead(p_intersect) // partitioned read
> with filtered partitions
>
> Lastly, we need to change the parallelism of 1) source1, 2) source2, and 3)
> all of their downstream operators until (and including) their first common
> ancestor (e.g., join) to be equal to the number of partitions (size of
> p_intersect).
>
> 2. All other cases
>
> In all other cases, the parallelism of both sources and their downstream
> operators until their common ancestor would be equal to the MIN(p_s1,
> p_s2).
> That is, minimum of the partition size of source1 and partition size of
> source2 will be selected as the parallelism.
> Coming back to your example, if source1 parallelism is 127 and source2
> parallelism is 128, then we will first check the partition size of source1
> and source2.
> Say partition size of source1 is 100 and partition size of source2 is 90.
> Then, we would set the parallelism for source1, source2, and all of their
> downstream operators until (and including) the join operator
> to 90 (min(100, 90)).
> We also plan to implement a cost based decision instead of the rule-based
> one (the ones explained above - MIN rule).
> One  possible result of the cost based estimation is to keep the partitions
> on one side and perform the shuffling on another source.
>
>
>
> 2. In our current shuffle remove design (FlinkExpandConversionRule),
> > we don't consider parallelism, we just remove unnecessary shuffles
> > according to the distribution columns. After this FLIP, the
> > parallelism may be bundled with source's partitions, then how will
> > this optimization accommodate with FlinkExpandConversionRule, will you
> > also change downstream operator's parallelisms if we want to also
> > remove subsequent shuffles?
>
>
>
> - From my understanding of FlinkExpandConversionRule, its removal logic is
> agnostic to operator parallelism.
> So, if FlinkExpandConversionRule decides to remove a shuffle operation,
> then this FLIP will search another possible shuffle (the one closest to the
> source) to remove.
> If there is such an opportunity, this FLIP will remove the shuffle. So,
> from my understanding FlinkExpandConversionRule and this optimization rule
> can work together safely.
> Please correct me if I misunderstood your question.
>
>
>
> Regarding the new optimization rule, have you also considered to allow
> > some non-strict mode like FlinkRelDistribution#requireStrict? For
> > example, source is pre-partitioned by a, b columns, if we are
> > consuming this source, and do a aggregate on a, b, c, can we utilize
> > this optimization?
>
>
> - Good point. Yes, there are some cases that non-strict mode will apply.
> For example:
>
> - pre-partitioned columns and aggregate columns are the same but have
> different order (e.g., source pre-partitioned  w.r.t. a,b and aggregate has
> a GROUP BY b,a)
> - columns in the Exchange operator is a list-prefix of pre-partitoned
> columns of source (e.g., source is pre-partitioned w.r.t. a,b,c and
> Exchange's partition columns are a,b)
>
> Please let me know if the above answers your questions or if you have any
> other comments.
>
> Regards,
> Jeyhun
>
> On Thu, Mar 14, 2024 at 12:48 PM Benchao Li  wrote:
>
> > Thanks Jeyhun for bringing up this discussion, it is really exiting,
> > +1 for the general idea.
> >
> > We also introduced a similar concept in Flink Batch internally to cope
> > with bucketed tables in Hive, it is a very important improvement.
> >
> > > One thing to note is that for join queries, the parallelism of each join
> > > source might be different. This might result in
> > > inconsistencies while using the pre-partitioned/pre-divided data (e.g.,
> > > different mappings of partitions to source operators).
> > > Therefore, it is the job of planner to detect this and 

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-21 Thread lorenzo . affetti
Hello Jeyhun,
I really like the proposal and definitely makes sense to me.

I have a couple of nits here and there:

For the interface `SupportsPartitioning`, why returning `Optional`?
If one decides to implement that, partitions must exist (at maximum, return and 
empty list). Returning `Optional` seem just to complicate the logic of the code 
using that interface.

I foresee the using code doing something like: "if the source supports 
partitioning, get the partitions, but if they don't exist, raise a runtime 
exception". Let's simply make that safe at compile time and guarantee the code 
that partitions exist.

Another thing is that you show Hive-like partitioning in your FS structure, do 
you think it makes sense to add a note about auto-discovery of partitions?

In other terms, it looks a bit counterintuitive that the user implementing the 
source has to specify which partitions exist statically (and they can change at 
runtime), while the source itself knows the data provider and can directly 
implement a method `discoverPartitions`. Then Flink would take care of invoking 
that method when needed.
On Mar 15, 2024 at 22:09 +0100, Jeyhun Karimov , wrote:
> Hi Benchao,
>
> Thanks for your comments.
>
> 1. What the parallelism would you take? E.g., 128 + 256 => 128? What
> > if we cannot have a good greatest common divisor, like 127 + 128,
> > could we just utilize one side's pre-partitioned attribute, and let
> > another side just do the shuffle?
>
>
> There are two cases we need to consider:
>
> 1. Static Partition (no partitions are added during the query execution) is
> enabled AND both sources implement "SupportsPartitionPushdown"
>
> In this case, we are sure that no new partitions will be added at runtime.
> So, we have a chance equalize both sources' partitions and parallelism, IFF
> both sources implement "SupportsPartitionPushdown" interface.
> To achieve so, first we will fetch the existing partitions from source1
> (say p_s1) and from source2 (say p_s2).
> Then, we find the intersection of these two partition sets (say
> p_intersect) and pushdown these partitions:
>
> SupportsPartitionPushDown::applyPartitions(p_intersect) // make sure that
> only specific partitions are read
> SupportsPartitioning::applyPartitionedRead(p_intersect) // partitioned read
> with filtered partitions
>
> Lastly, we need to change the parallelism of 1) source1, 2) source2, and 3)
> all of their downstream operators until (and including) their first common
> ancestor (e.g., join) to be equal to the number of partitions (size of
> p_intersect).
>
> 2. All other cases
>
> In all other cases, the parallelism of both sources and their downstream
> operators until their common ancestor would be equal to the MIN(p_s1,
> p_s2).
> That is, minimum of the partition size of source1 and partition size of
> source2 will be selected as the parallelism.
> Coming back to your example, if source1 parallelism is 127 and source2
> parallelism is 128, then we will first check the partition size of source1
> and source2.
> Say partition size of source1 is 100 and partition size of source2 is 90.
> Then, we would set the parallelism for source1, source2, and all of their
> downstream operators until (and including) the join operator
> to 90 (min(100, 90)).
> We also plan to implement a cost based decision instead of the rule-based
> one (the ones explained above - MIN rule).
> One possible result of the cost based estimation is to keep the partitions
> on one side and perform the shuffling on another source.
>
>
>
> 2. In our current shuffle remove design (FlinkExpandConversionRule),
> > we don't consider parallelism, we just remove unnecessary shuffles
> > according to the distribution columns. After this FLIP, the
> > parallelism may be bundled with source's partitions, then how will
> > this optimization accommodate with FlinkExpandConversionRule, will you
> > also change downstream operator's parallelisms if we want to also
> > remove subsequent shuffles?
>
>
>
> - From my understanding of FlinkExpandConversionRule, its removal logic is
> agnostic to operator parallelism.
> So, if FlinkExpandConversionRule decides to remove a shuffle operation,
> then this FLIP will search another possible shuffle (the one closest to the
> source) to remove.
> If there is such an opportunity, this FLIP will remove the shuffle. So,
> from my understanding FlinkExpandConversionRule and this optimization rule
> can work together safely.
> Please correct me if I misunderstood your question.
>
>
>
> Regarding the new optimization rule, have you also considered to allow
> > some non-strict mode like FlinkRelDistribution#requireStrict? For
> > example, source is pre-partitioned by a, b columns, if we are
> > consuming this source, and do a aggregate on a, b, c, can we utilize
> > this optimization?
>
>
> - Good point. Yes, there are some cases that non-strict mode will apply.
> For example:
>
> - pre-partitioned columns and aggregate columns are the same 

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-15 Thread Jeyhun Karimov
Hi Benchao,

Thanks for your comments.

1. What the parallelism would you take? E.g., 128 + 256 => 128? What
> if we cannot have a good greatest common divisor, like 127 + 128,
> could we just utilize one side's pre-partitioned attribute, and let
> another side just do the shuffle?


There are two cases we need to consider:

1. Static Partition (no partitions are added during the query execution) is
enabled AND both sources implement "SupportsPartitionPushdown"

In this case, we are sure that no new partitions will be added at runtime.
So, we have a chance equalize both sources' partitions and parallelism, IFF
both sources implement "SupportsPartitionPushdown" interface.
To achieve so, first we will fetch the existing partitions from source1
(say p_s1) and from source2 (say p_s2).
Then, we find the intersection of these two partition sets (say
p_intersect) and pushdown these partitions:

SupportsPartitionPushDown::applyPartitions(p_intersect) // make sure that
only specific partitions are read
SupportsPartitioning::applyPartitionedRead(p_intersect) // partitioned read
with filtered partitions

Lastly, we need to change the parallelism of 1) source1, 2) source2, and 3)
all of their downstream operators until (and including) their first common
ancestor (e.g., join) to be equal to the number of partitions (size of
p_intersect).

2. All other cases

In all other cases, the parallelism of both sources and their downstream
operators until their common ancestor would be equal to the MIN(p_s1,
p_s2).
That is, minimum of the partition size of source1 and partition size of
source2 will be selected as the parallelism.
Coming back to your example, if source1 parallelism is 127 and source2
parallelism is 128, then we will first check the partition size of source1
and source2.
Say partition size of source1 is 100 and partition size of source2 is 90.
Then, we would set the parallelism for source1, source2, and all of their
downstream operators until (and including) the join operator
to 90 (min(100, 90)).
We also plan to implement a cost based decision instead of the rule-based
one (the ones explained above - MIN rule).
One  possible result of the cost based estimation is to keep the partitions
on one side and perform the shuffling on another source.



2. In our current shuffle remove design (FlinkExpandConversionRule),
> we don't consider parallelism, we just remove unnecessary shuffles
> according to the distribution columns. After this FLIP, the
> parallelism may be bundled with source's partitions, then how will
> this optimization accommodate with FlinkExpandConversionRule, will you
> also change downstream operator's parallelisms if we want to also
> remove subsequent shuffles?



- From my understanding of FlinkExpandConversionRule, its removal logic is
agnostic to operator parallelism.
So, if FlinkExpandConversionRule decides to remove a shuffle operation,
then this FLIP will search another possible shuffle (the one closest to the
source) to remove.
If there is such an opportunity, this FLIP will remove the shuffle. So,
from my understanding FlinkExpandConversionRule and this optimization rule
can work together safely.
Please correct me if I misunderstood your question.



Regarding the new optimization rule, have you also considered to allow
> some non-strict mode like FlinkRelDistribution#requireStrict? For
> example, source is pre-partitioned by a, b columns, if we are
> consuming this source, and do a aggregate on a, b, c, can we utilize
> this optimization?


- Good point. Yes, there are some cases that non-strict mode will apply.
For example:

- pre-partitioned columns and aggregate columns are the same but have
different order (e.g., source pre-partitioned  w.r.t. a,b and aggregate has
a GROUP BY b,a)
- columns in the Exchange operator is a list-prefix of pre-partitoned
columns of source (e.g., source is pre-partitioned w.r.t. a,b,c and
Exchange's partition columns are a,b)

Please let me know if the above answers your questions or if you have any
other comments.

Regards,
Jeyhun

On Thu, Mar 14, 2024 at 12:48 PM Benchao Li  wrote:

> Thanks Jeyhun for bringing up this discussion, it is really exiting,
> +1 for the general idea.
>
> We also introduced a similar concept in Flink Batch internally to cope
> with bucketed tables in Hive, it is a very important improvement.
>
> > One thing to note is that for join queries, the parallelism of each join
> > source might be different. This might result in
> > inconsistencies while using the pre-partitioned/pre-divided data (e.g.,
> > different mappings of partitions to source operators).
> > Therefore, it is the job of planner to detect this and adjust the
> > parallelism. With that having in mind,
> > the rest (how the split assigners perform) is consistent among many
> > sources.
>
> Could you elaborate a little more on this. I added my two cents here
> about this part:
> 1. What the parallelism would you take? E.g., 128 + 256 => 128? What
> if we cannot have a 

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-14 Thread Jeyhun Karimov
Hi Jane,

Thanks for your comments.

I understand your point.  **It would be better if you could sync the
> content to the FLIP**.


- Sure thing. I added my above answer to the FLIP.


Another thing is I'm curious about what the physical plan looks like. Is
> there any specific info that will be added to the table source (like
> filter/project pushdown)? It would be great if you could attach an example
> to the FLIP.


- For the physical plan, the table source will have an additional info
named "partitionedReading" or "partitionedRead". For example:

CREATE TABLE MyTableP (
  a bigint,
  b int,
  c varchar
) PARTITIONED BY (a, b) with (
 'connector' = 'filesystem', 'format' = 'testcsv', 'path' = '/root_dir')


SELECT a, b, COUNT (c) from MyTableP GROUP BY a, b


+- LocalHashAggregate(groupBy=[a, b], select=[a, b, Partial_COUNT(c) AS
count$0])
  +- TableSourceScan(table=[[default_catalog, default_database, MyTableP,
partitionedReading]], fields=[a, b, c])


I also added this example to the FLIP.


Please let me know if that answers your question or if you have any other
comments.


Regards,

Jeyhun


On Thu, Mar 14, 2024 at 8:23 AM Jane Chan  wrote:

> Hi Jeyhun,
>
> Thanks for your clarification.
>
> > Once a new partition is detected, we add it to our existing mapping. Our
> mapping looks like Map> subtaskToPartitionAssignment,
> where it maps each source subtaskID to zero or more partitions.
>
> I understand your point.  **It would be better if you could sync the
> content to the FLIP**.
>
> Another thing is I'm curious about what the physical plan looks like. Is
> there any specific info that will be added to the table source (like
> filter/project pushdown)? It would be great if you could attach an example
> to the FLIP.
>
> Bests,
> Jane
>
> On Wed, Mar 13, 2024 at 9:11 PM Jeyhun Karimov 
> wrote:
>
> > Hi Jane,
> >
> > Thanks for your comments.
> >
> >
> > 1. Concerning the `sourcePartitions()` method, the partition information
> > > returned during the optimization phase may not be the same as the
> > partition
> > > information during runtime execution. For long-running jobs, partitions
> > may
> > > be continuously created. Is this FLIP equipped to handle scenarios?
> >
> >
> > - Good point. This scenario is definitely supported.
> > Once a new partition is added, or in general, new splits are
> > discovered,
> > PartitionAwareSplitAssigner::addSplits(Collection
> > newSplits)
> > method will be called. Inside that method, we are able to detect if a
> split
> > belongs to existing partitions or there is a new partition.
> > Once a new partition is detected, we add it to our existing mapping. Our
> > mapping looks like Map>
> subtaskToPartitionAssignment,
> > where
> > it maps each source subtaskID to zero or more partitions.
> >
> > 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I
> > > understand that it is also necessary to verify whether the hash key
> > within
> > > the Exchange node is consistent with the partition key defined in the
> > table
> > > source that implements `SupportsPartitioning`.
> >
> >
> > - Yes, I overlooked that point, fixed. Actually, the rule is much
> > complicated. I tried to simplify it in the FLIP. Good point.
> >
> >
> > 3. Could you elaborate on the desired physical plan and integration with
> > > `CompiledPlan` to enhance the overall functionality?
> >
> >
> > - For compiled plan, PartitioningSpec will be used, with a json tag
> > "Partitioning". As a result, in the compiled plan, the source operator
> will
> > have
> > "abilities" : [ { "type" : "Partitioning" } ] as part of the compiled
> plan.
> > More about the implementation details below:
> >
> > 
> > PartitioningSpec class
> > 
> > @JsonTypeName("Partitioning")
> > public final class PartitioningSpec extends SourceAbilitySpecBase {
> >  // some code here
> > @Override
> > public void apply(DynamicTableSource tableSource,
> SourceAbilityContext
> > context) {
> > if (tableSource instanceof SupportsPartitioning) {
> > ((SupportsPartitioning)
> tableSource).applyPartitionedRead();
> > } else {
> > throw new TableException(
> > String.format(
> > "%s does not support SupportsPartitioning.",
> > tableSource.getClass().getName()));
> > }
> > }
> >   // some code here
> > }
> >
> > 
> > SourceAbilitySpec class
> > 
> > @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include =
> > JsonTypeInfo.As.PROPERTY, property = "type")
> > @JsonSubTypes({
> > @JsonSubTypes.Type(value = FilterPushDownSpec.class),
> > @JsonSubTypes.Type(value = LimitPushDownSpec.class),
> > @JsonSubTypes.Type(value = PartitionPushDownSpec.class),
> > @JsonSubTypes.Type(value = ProjectPushDownSpec.class),
> > @JsonSubTypes.Type(value = 

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-14 Thread Jeyhun Karimov
Hi Hang,

Thanks for the comments.

I have a question about the part `Additional option to disable this
> optimization`. Is this option a source configuration or a table
> configuration?


- It is a source configuration.

Besides that, there is a little mistake if I do not understand wrongly.
> Should `Check if upstream_any is pre-partitioned data source AND contains
> the same partition keys as the source` be changed as `Check if upstream_any
> is pre-partitioned data source AND contains the same partition keys as
> downstream_any` ?


- Yes, 'source' should be 'exchange' here.

I materialized both points to the FLIP.

Please let me know if that answers your questions or if you have any other
comments.

Regards,
Jeyhun


On Thu, Mar 14, 2024 at 8:10 AM Hang Ruan  wrote:

> Hi, Jeyhun.
>
> Thanks for the FLIP. Totally +1 for it.
>
> I have a question about the part `Additional option to disable this
> optimization`. Is this option a source configuration or a table
> configuration?
>
> Besides that, there is a little mistake if I do not understand wrongly.
> Should `Check if upstream_any is pre-partitioned data source AND contains
> the same partition keys as the source` be changed as `Check if upstream_any
> is pre-partitioned data source AND contains the same partition keys as
> downstream_any` ?
>
> Best,
> Hang
>
> Jeyhun Karimov  于2024年3月13日周三 21:11写道:
>
> > Hi Jane,
> >
> > Thanks for your comments.
> >
> >
> > 1. Concerning the `sourcePartitions()` method, the partition information
> > > returned during the optimization phase may not be the same as the
> > partition
> > > information during runtime execution. For long-running jobs, partitions
> > may
> > > be continuously created. Is this FLIP equipped to handle scenarios?
> >
> >
> > - Good point. This scenario is definitely supported.
> > Once a new partition is added, or in general, new splits are
> > discovered,
> > PartitionAwareSplitAssigner::addSplits(Collection
> > newSplits)
> > method will be called. Inside that method, we are able to detect if a
> split
> > belongs to existing partitions or there is a new partition.
> > Once a new partition is detected, we add it to our existing mapping. Our
> > mapping looks like Map>
> subtaskToPartitionAssignment,
> > where
> > it maps each source subtaskID to zero or more partitions.
> >
> > 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I
> > > understand that it is also necessary to verify whether the hash key
> > within
> > > the Exchange node is consistent with the partition key defined in the
> > table
> > > source that implements `SupportsPartitioning`.
> >
> >
> > - Yes, I overlooked that point, fixed. Actually, the rule is much
> > complicated. I tried to simplify it in the FLIP. Good point.
> >
> >
> > 3. Could you elaborate on the desired physical plan and integration with
> > > `CompiledPlan` to enhance the overall functionality?
> >
> >
> > - For compiled plan, PartitioningSpec will be used, with a json tag
> > "Partitioning". As a result, in the compiled plan, the source operator
> will
> > have
> > "abilities" : [ { "type" : "Partitioning" } ] as part of the compiled
> plan.
> > More about the implementation details below:
> >
> > 
> > PartitioningSpec class
> > 
> > @JsonTypeName("Partitioning")
> > public final class PartitioningSpec extends SourceAbilitySpecBase {
> >  // some code here
> > @Override
> > public void apply(DynamicTableSource tableSource,
> SourceAbilityContext
> > context) {
> > if (tableSource instanceof SupportsPartitioning) {
> > ((SupportsPartitioning)
> tableSource).applyPartitionedRead();
> > } else {
> > throw new TableException(
> > String.format(
> > "%s does not support SupportsPartitioning.",
> > tableSource.getClass().getName()));
> > }
> > }
> >   // some code here
> > }
> >
> > 
> > SourceAbilitySpec class
> > 
> > @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include =
> > JsonTypeInfo.As.PROPERTY, property = "type")
> > @JsonSubTypes({
> > @JsonSubTypes.Type(value = FilterPushDownSpec.class),
> > @JsonSubTypes.Type(value = LimitPushDownSpec.class),
> > @JsonSubTypes.Type(value = PartitionPushDownSpec.class),
> > @JsonSubTypes.Type(value = ProjectPushDownSpec.class),
> > @JsonSubTypes.Type(value = ReadingMetadataSpec.class),
> > @JsonSubTypes.Type(value = WatermarkPushDownSpec.class),
> > @JsonSubTypes.Type(value = SourceWatermarkSpec.class),
> > @JsonSubTypes.Type(value = AggregatePushDownSpec.class),
> > +  @JsonSubTypes.Type(value = PartitioningSpec.class)
>  //
> > new added
> >
> >
> >
> > Please let me know if that answers your questions or if you have other
> > comments.
> >
> > Regards,
> > Jeyhun
> >
> >
> > On Tue, Mar 12, 2024 at 8:56 AM 

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-14 Thread Benchao Li
Thanks Jeyhun for bringing up this discussion, it is really exiting,
+1 for the general idea.

We also introduced a similar concept in Flink Batch internally to cope
with bucketed tables in Hive, it is a very important improvement.

> One thing to note is that for join queries, the parallelism of each join
> source might be different. This might result in
> inconsistencies while using the pre-partitioned/pre-divided data (e.g.,
> different mappings of partitions to source operators).
> Therefore, it is the job of planner to detect this and adjust the
> parallelism. With that having in mind,
> the rest (how the split assigners perform) is consistent among many
> sources.

Could you elaborate a little more on this. I added my two cents here
about this part:
1. What the parallelism would you take? E.g., 128 + 256 => 128? What
if we cannot have a good greatest common divisor, like 127 + 128,
could we just utilize one side's pre-partitioned attribute, and let
another side just do the shuffle?
2. In our current shuffle remove design (FlinkExpandConversionRule),
we don't consider parallelism, we just remove unnecessary shuffles
according to the distribution columns. After this FLIP, the
parallelism may be bundled with source's partitions, then how will
this optimization accommodate with FlinkExpandConversionRule, will you
also change downstream operator's parallelisms if we want to also
remove subsequent shuffles?


Regarding the new optimization rule, have you also considered to allow
some non-strict mode like FlinkRelDistribution#requireStrict? For
example, source is pre-partitioned by a, b columns, if we are
consuming this source, and do a aggregate on a, b, c, can we utilize
this optimization?

Jane Chan  于2024年3月14日周四 15:24写道:
>
> Hi Jeyhun,
>
> Thanks for your clarification.
>
> > Once a new partition is detected, we add it to our existing mapping. Our
> mapping looks like Map> subtaskToPartitionAssignment,
> where it maps each source subtaskID to zero or more partitions.
>
> I understand your point.  **It would be better if you could sync the
> content to the FLIP**.
>
> Another thing is I'm curious about what the physical plan looks like. Is
> there any specific info that will be added to the table source (like
> filter/project pushdown)? It would be great if you could attach an example
> to the FLIP.
>
> Bests,
> Jane
>
> On Wed, Mar 13, 2024 at 9:11 PM Jeyhun Karimov  wrote:
>
> > Hi Jane,
> >
> > Thanks for your comments.
> >
> >
> > 1. Concerning the `sourcePartitions()` method, the partition information
> > > returned during the optimization phase may not be the same as the
> > partition
> > > information during runtime execution. For long-running jobs, partitions
> > may
> > > be continuously created. Is this FLIP equipped to handle scenarios?
> >
> >
> > - Good point. This scenario is definitely supported.
> > Once a new partition is added, or in general, new splits are
> > discovered,
> > PartitionAwareSplitAssigner::addSplits(Collection
> > newSplits)
> > method will be called. Inside that method, we are able to detect if a split
> > belongs to existing partitions or there is a new partition.
> > Once a new partition is detected, we add it to our existing mapping. Our
> > mapping looks like Map> subtaskToPartitionAssignment,
> > where
> > it maps each source subtaskID to zero or more partitions.
> >
> > 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I
> > > understand that it is also necessary to verify whether the hash key
> > within
> > > the Exchange node is consistent with the partition key defined in the
> > table
> > > source that implements `SupportsPartitioning`.
> >
> >
> > - Yes, I overlooked that point, fixed. Actually, the rule is much
> > complicated. I tried to simplify it in the FLIP. Good point.
> >
> >
> > 3. Could you elaborate on the desired physical plan and integration with
> > > `CompiledPlan` to enhance the overall functionality?
> >
> >
> > - For compiled plan, PartitioningSpec will be used, with a json tag
> > "Partitioning". As a result, in the compiled plan, the source operator will
> > have
> > "abilities" : [ { "type" : "Partitioning" } ] as part of the compiled plan.
> > More about the implementation details below:
> >
> > 
> > PartitioningSpec class
> > 
> > @JsonTypeName("Partitioning")
> > public final class PartitioningSpec extends SourceAbilitySpecBase {
> >  // some code here
> > @Override
> > public void apply(DynamicTableSource tableSource, SourceAbilityContext
> > context) {
> > if (tableSource instanceof SupportsPartitioning) {
> > ((SupportsPartitioning) tableSource).applyPartitionedRead();
> > } else {
> > throw new TableException(
> > String.format(
> > "%s does not support SupportsPartitioning.",
> > tableSource.getClass().getName()));
> > }

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-14 Thread Jane Chan
Hi Jeyhun,

Thanks for your clarification.

> Once a new partition is detected, we add it to our existing mapping. Our
mapping looks like Map> subtaskToPartitionAssignment,
where it maps each source subtaskID to zero or more partitions.

I understand your point.  **It would be better if you could sync the
content to the FLIP**.

Another thing is I'm curious about what the physical plan looks like. Is
there any specific info that will be added to the table source (like
filter/project pushdown)? It would be great if you could attach an example
to the FLIP.

Bests,
Jane

On Wed, Mar 13, 2024 at 9:11 PM Jeyhun Karimov  wrote:

> Hi Jane,
>
> Thanks for your comments.
>
>
> 1. Concerning the `sourcePartitions()` method, the partition information
> > returned during the optimization phase may not be the same as the
> partition
> > information during runtime execution. For long-running jobs, partitions
> may
> > be continuously created. Is this FLIP equipped to handle scenarios?
>
>
> - Good point. This scenario is definitely supported.
> Once a new partition is added, or in general, new splits are
> discovered,
> PartitionAwareSplitAssigner::addSplits(Collection
> newSplits)
> method will be called. Inside that method, we are able to detect if a split
> belongs to existing partitions or there is a new partition.
> Once a new partition is detected, we add it to our existing mapping. Our
> mapping looks like Map> subtaskToPartitionAssignment,
> where
> it maps each source subtaskID to zero or more partitions.
>
> 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I
> > understand that it is also necessary to verify whether the hash key
> within
> > the Exchange node is consistent with the partition key defined in the
> table
> > source that implements `SupportsPartitioning`.
>
>
> - Yes, I overlooked that point, fixed. Actually, the rule is much
> complicated. I tried to simplify it in the FLIP. Good point.
>
>
> 3. Could you elaborate on the desired physical plan and integration with
> > `CompiledPlan` to enhance the overall functionality?
>
>
> - For compiled plan, PartitioningSpec will be used, with a json tag
> "Partitioning". As a result, in the compiled plan, the source operator will
> have
> "abilities" : [ { "type" : "Partitioning" } ] as part of the compiled plan.
> More about the implementation details below:
>
> 
> PartitioningSpec class
> 
> @JsonTypeName("Partitioning")
> public final class PartitioningSpec extends SourceAbilitySpecBase {
>  // some code here
> @Override
> public void apply(DynamicTableSource tableSource, SourceAbilityContext
> context) {
> if (tableSource instanceof SupportsPartitioning) {
> ((SupportsPartitioning) tableSource).applyPartitionedRead();
> } else {
> throw new TableException(
> String.format(
> "%s does not support SupportsPartitioning.",
> tableSource.getClass().getName()));
> }
> }
>   // some code here
> }
>
> 
> SourceAbilitySpec class
> 
> @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include =
> JsonTypeInfo.As.PROPERTY, property = "type")
> @JsonSubTypes({
> @JsonSubTypes.Type(value = FilterPushDownSpec.class),
> @JsonSubTypes.Type(value = LimitPushDownSpec.class),
> @JsonSubTypes.Type(value = PartitionPushDownSpec.class),
> @JsonSubTypes.Type(value = ProjectPushDownSpec.class),
> @JsonSubTypes.Type(value = ReadingMetadataSpec.class),
> @JsonSubTypes.Type(value = WatermarkPushDownSpec.class),
> @JsonSubTypes.Type(value = SourceWatermarkSpec.class),
> @JsonSubTypes.Type(value = AggregatePushDownSpec.class),
> +  @JsonSubTypes.Type(value = PartitioningSpec.class)   //
> new added
>
>
>
> Please let me know if that answers your questions or if you have other
> comments.
>
> Regards,
> Jeyhun
>
>
> On Tue, Mar 12, 2024 at 8:56 AM Jane Chan  wrote:
>
> > Hi Jeyhun,
> >
> > Thank you for leading the discussion. I'm generally +1 with this
> proposal,
> > along with some questions. Please see my comments below.
> >
> > 1. Concerning the `sourcePartitions()` method, the partition information
> > returned during the optimization phase may not be the same as the
> partition
> > information during runtime execution. For long-running jobs, partitions
> may
> > be continuously created. Is this FLIP equipped to handle scenarios?
> >
> > 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I
> > understand that it is also necessary to verify whether the hash key
> within
> > the Exchange node is consistent with the partition key defined in the
> table
> > source that implements `SupportsPartitioning`.
> >
> > 3. Could you elaborate on the desired physical plan and integration with
> > `CompiledPlan` to enhance the overall functionality?
> >
> > Best,

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-14 Thread Hang Ruan
Hi, Jeyhun.

Thanks for the FLIP. Totally +1 for it.

I have a question about the part `Additional option to disable this
optimization`. Is this option a source configuration or a table
configuration?

Besides that, there is a little mistake if I do not understand wrongly.
Should `Check if upstream_any is pre-partitioned data source AND contains
the same partition keys as the source` be changed as `Check if upstream_any
is pre-partitioned data source AND contains the same partition keys as
downstream_any` ?

Best,
Hang

Jeyhun Karimov  于2024年3月13日周三 21:11写道:

> Hi Jane,
>
> Thanks for your comments.
>
>
> 1. Concerning the `sourcePartitions()` method, the partition information
> > returned during the optimization phase may not be the same as the
> partition
> > information during runtime execution. For long-running jobs, partitions
> may
> > be continuously created. Is this FLIP equipped to handle scenarios?
>
>
> - Good point. This scenario is definitely supported.
> Once a new partition is added, or in general, new splits are
> discovered,
> PartitionAwareSplitAssigner::addSplits(Collection
> newSplits)
> method will be called. Inside that method, we are able to detect if a split
> belongs to existing partitions or there is a new partition.
> Once a new partition is detected, we add it to our existing mapping. Our
> mapping looks like Map> subtaskToPartitionAssignment,
> where
> it maps each source subtaskID to zero or more partitions.
>
> 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I
> > understand that it is also necessary to verify whether the hash key
> within
> > the Exchange node is consistent with the partition key defined in the
> table
> > source that implements `SupportsPartitioning`.
>
>
> - Yes, I overlooked that point, fixed. Actually, the rule is much
> complicated. I tried to simplify it in the FLIP. Good point.
>
>
> 3. Could you elaborate on the desired physical plan and integration with
> > `CompiledPlan` to enhance the overall functionality?
>
>
> - For compiled plan, PartitioningSpec will be used, with a json tag
> "Partitioning". As a result, in the compiled plan, the source operator will
> have
> "abilities" : [ { "type" : "Partitioning" } ] as part of the compiled plan.
> More about the implementation details below:
>
> 
> PartitioningSpec class
> 
> @JsonTypeName("Partitioning")
> public final class PartitioningSpec extends SourceAbilitySpecBase {
>  // some code here
> @Override
> public void apply(DynamicTableSource tableSource, SourceAbilityContext
> context) {
> if (tableSource instanceof SupportsPartitioning) {
> ((SupportsPartitioning) tableSource).applyPartitionedRead();
> } else {
> throw new TableException(
> String.format(
> "%s does not support SupportsPartitioning.",
> tableSource.getClass().getName()));
> }
> }
>   // some code here
> }
>
> 
> SourceAbilitySpec class
> 
> @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include =
> JsonTypeInfo.As.PROPERTY, property = "type")
> @JsonSubTypes({
> @JsonSubTypes.Type(value = FilterPushDownSpec.class),
> @JsonSubTypes.Type(value = LimitPushDownSpec.class),
> @JsonSubTypes.Type(value = PartitionPushDownSpec.class),
> @JsonSubTypes.Type(value = ProjectPushDownSpec.class),
> @JsonSubTypes.Type(value = ReadingMetadataSpec.class),
> @JsonSubTypes.Type(value = WatermarkPushDownSpec.class),
> @JsonSubTypes.Type(value = SourceWatermarkSpec.class),
> @JsonSubTypes.Type(value = AggregatePushDownSpec.class),
> +  @JsonSubTypes.Type(value = PartitioningSpec.class)   //
> new added
>
>
>
> Please let me know if that answers your questions or if you have other
> comments.
>
> Regards,
> Jeyhun
>
>
> On Tue, Mar 12, 2024 at 8:56 AM Jane Chan  wrote:
>
> > Hi Jeyhun,
> >
> > Thank you for leading the discussion. I'm generally +1 with this
> proposal,
> > along with some questions. Please see my comments below.
> >
> > 1. Concerning the `sourcePartitions()` method, the partition information
> > returned during the optimization phase may not be the same as the
> partition
> > information during runtime execution. For long-running jobs, partitions
> may
> > be continuously created. Is this FLIP equipped to handle scenarios?
> >
> > 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I
> > understand that it is also necessary to verify whether the hash key
> within
> > the Exchange node is consistent with the partition key defined in the
> table
> > source that implements `SupportsPartitioning`.
> >
> > 3. Could you elaborate on the desired physical plan and integration with
> > `CompiledPlan` to enhance the overall functionality?
> >
> > Best,
> > Jane
> >
> > On Tue, Mar 12, 2024 at 11:11 AM Jim Hughes  >

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-13 Thread Jeyhun Karimov
Hi Jane,

Thanks for your comments.


1. Concerning the `sourcePartitions()` method, the partition information
> returned during the optimization phase may not be the same as the partition
> information during runtime execution. For long-running jobs, partitions may
> be continuously created. Is this FLIP equipped to handle scenarios?


- Good point. This scenario is definitely supported.
Once a new partition is added, or in general, new splits are
discovered, PartitionAwareSplitAssigner::addSplits(Collection
newSplits)
method will be called. Inside that method, we are able to detect if a split
belongs to existing partitions or there is a new partition.
Once a new partition is detected, we add it to our existing mapping. Our
mapping looks like Map> subtaskToPartitionAssignment,
where
it maps each source subtaskID to zero or more partitions.

2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I
> understand that it is also necessary to verify whether the hash key within
> the Exchange node is consistent with the partition key defined in the table
> source that implements `SupportsPartitioning`.


- Yes, I overlooked that point, fixed. Actually, the rule is much
complicated. I tried to simplify it in the FLIP. Good point.


3. Could you elaborate on the desired physical plan and integration with
> `CompiledPlan` to enhance the overall functionality?


- For compiled plan, PartitioningSpec will be used, with a json tag
"Partitioning". As a result, in the compiled plan, the source operator will
have
"abilities" : [ { "type" : "Partitioning" } ] as part of the compiled plan.
More about the implementation details below:


PartitioningSpec class

@JsonTypeName("Partitioning")
public final class PartitioningSpec extends SourceAbilitySpecBase {
 // some code here
@Override
public void apply(DynamicTableSource tableSource, SourceAbilityContext
context) {
if (tableSource instanceof SupportsPartitioning) {
((SupportsPartitioning) tableSource).applyPartitionedRead();
} else {
throw new TableException(
String.format(
"%s does not support SupportsPartitioning.",
tableSource.getClass().getName()));
}
}
  // some code here
}


SourceAbilitySpec class

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include =
JsonTypeInfo.As.PROPERTY, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = FilterPushDownSpec.class),
@JsonSubTypes.Type(value = LimitPushDownSpec.class),
@JsonSubTypes.Type(value = PartitionPushDownSpec.class),
@JsonSubTypes.Type(value = ProjectPushDownSpec.class),
@JsonSubTypes.Type(value = ReadingMetadataSpec.class),
@JsonSubTypes.Type(value = WatermarkPushDownSpec.class),
@JsonSubTypes.Type(value = SourceWatermarkSpec.class),
@JsonSubTypes.Type(value = AggregatePushDownSpec.class),
+  @JsonSubTypes.Type(value = PartitioningSpec.class)   //
new added



Please let me know if that answers your questions or if you have other
comments.

Regards,
Jeyhun


On Tue, Mar 12, 2024 at 8:56 AM Jane Chan  wrote:

> Hi Jeyhun,
>
> Thank you for leading the discussion. I'm generally +1 with this proposal,
> along with some questions. Please see my comments below.
>
> 1. Concerning the `sourcePartitions()` method, the partition information
> returned during the optimization phase may not be the same as the partition
> information during runtime execution. For long-running jobs, partitions may
> be continuously created. Is this FLIP equipped to handle scenarios?
>
> 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I
> understand that it is also necessary to verify whether the hash key within
> the Exchange node is consistent with the partition key defined in the table
> source that implements `SupportsPartitioning`.
>
> 3. Could you elaborate on the desired physical plan and integration with
> `CompiledPlan` to enhance the overall functionality?
>
> Best,
> Jane
>
> On Tue, Mar 12, 2024 at 11:11 AM Jim Hughes 
> wrote:
>
> > Hi Jeyhun,
> >
> > I like the idea!  Given FLIP-376[1], I wonder if it'd make sense to
> > generalize FLIP-434 to be about "pre-divided" data to cover "buckets" and
> > "partitions" (and maybe even situations where a data source is
> partitioned
> > and bucketed).
> >
> > Separate from that, the page mentions TPC-H Q1 as an example.  For a
> join,
> > any two tables joined on the same bucket key should provide a concrete
> > example of a join.  Systems like Kafka Streams/ksqlDB call this
> > "co-partitioning"; for those systems, it is a requirement placed on the
> > input sources.  For Flink, with FLIP-434, the proposed planner rule
> > could remove the shuffle.
> >
> > Definitely a fun idea; I look forward to hearing more!
> >
> > Cheers,
> >
> > Jim
> >
> >
> > 1.
> 

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-13 Thread Jeyhun Karimov
Hi Jim,


Thanks for your comments.

I wonder if it'd make sense to
> generalize FLIP-434 to be about "pre-divided" data to cover "buckets" and
> "partitions" (and maybe even situations where a data source is partitioned
> and bucketed).


Now that I go through FLIP-376 [1] again, your suggestion definitely makes
sense.
For any source connector that can derive "DISTRIBUTION" metadata (e.g.,
distribution key/columns, bucket names, etc)
from the input source (e.g., given the dfs path), FLIP-434 [2] can and
should support reading pre-bucketed and/or pre-divided data.
I also added the relevant info to the FLIP-434 [2].


Separate from that, the page mentions TPC-H Q1 as an example.  For a join,
> any two tables joined on the same bucket key should provide a concrete
> example of a join.  Systems like Kafka Streams/ksqlDB call this
> "co-partitioning"; for those systems, it is a requirement placed on the
> input sources.  For Flink, with FLIP-434, the proposed planner rule
> could remove the shuffle.


- Yes, with this proposal we might be able to remove the shuffle when
compared to Kafka Streams/ksqlDB.
One thing to note is that for join queries, the parallelism of each join
source might be different. This might result in
inconsistencies while using the pre-partitioned/pre-divided data (e.g.,
different mappings of partitions to source operators).
Therefore, it is the job of planner to detect this and adjust the
parallelism. With that having in mind,
the rest (how the split assigners perform) is consistent among many
sources.

Please let. me know if that answers your questions or if you have any other
comments.

Regards,
Jeyhun

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause
[2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-434
%3A+Support+optimizations+for+pre-partitioned+data+sources


On Tue, Mar 12, 2024 at 4:11 AM Jim Hughes 
wrote:

> Hi Jeyhun,
>
> I like the idea!  Given FLIP-376[1], I wonder if it'd make sense to
> generalize FLIP-434 to be about "pre-divided" data to cover "buckets" and
> "partitions" (and maybe even situations where a data source is partitioned
> and bucketed).
>
> Separate from that, the page mentions TPC-H Q1 as an example.  For a join,
> any two tables joined on the same bucket key should provide a concrete
> example of a join.  Systems like Kafka Streams/ksqlDB call this
> "co-partitioning"; for those systems, it is a requirement placed on the
> input sources.  For Flink, with FLIP-434, the proposed planner rule
> could remove the shuffle.
>
> Definitely a fun idea; I look forward to hearing more!
>
> Cheers,
>
> Jim
>
>
> 1.
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause
> 2.
>
> https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/#co-partitioning-requirements
>
> On Sun, Mar 10, 2024 at 3:38 PM Jeyhun Karimov 
> wrote:
>
> > Hi devs,
> >
> > I’d like to start a discussion on FLIP-434: Support optimizations for
> > pre-partitioned data sources [1].
> >
> > The FLIP introduces taking advantage of pre-partitioned data sources for
> > SQL/Table API (it is already supported as experimental feature in
> > DataStream API [2]).
> >
> >
> > Please find more details in the FLIP wiki document [1].
> > Looking forward to your feedback.
> >
> > Regards,
> > Jeyhun
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-434%3A+Support+optimizations+for+pre-partitioned+data+sources
> > [2]
> >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/
> >
>


Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-12 Thread Jane Chan
Hi Jeyhun,

Thank you for leading the discussion. I'm generally +1 with this proposal,
along with some questions. Please see my comments below.

1. Concerning the `sourcePartitions()` method, the partition information
returned during the optimization phase may not be the same as the partition
information during runtime execution. For long-running jobs, partitions may
be continuously created. Is this FLIP equipped to handle scenarios?

2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I
understand that it is also necessary to verify whether the hash key within
the Exchange node is consistent with the partition key defined in the table
source that implements `SupportsPartitioning`.

3. Could you elaborate on the desired physical plan and integration with
`CompiledPlan` to enhance the overall functionality?

Best,
Jane

On Tue, Mar 12, 2024 at 11:11 AM Jim Hughes 
wrote:

> Hi Jeyhun,
>
> I like the idea!  Given FLIP-376[1], I wonder if it'd make sense to
> generalize FLIP-434 to be about "pre-divided" data to cover "buckets" and
> "partitions" (and maybe even situations where a data source is partitioned
> and bucketed).
>
> Separate from that, the page mentions TPC-H Q1 as an example.  For a join,
> any two tables joined on the same bucket key should provide a concrete
> example of a join.  Systems like Kafka Streams/ksqlDB call this
> "co-partitioning"; for those systems, it is a requirement placed on the
> input sources.  For Flink, with FLIP-434, the proposed planner rule
> could remove the shuffle.
>
> Definitely a fun idea; I look forward to hearing more!
>
> Cheers,
>
> Jim
>
>
> 1.
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause
> 2.
>
> https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/#co-partitioning-requirements
>
> On Sun, Mar 10, 2024 at 3:38 PM Jeyhun Karimov 
> wrote:
>
> > Hi devs,
> >
> > I’d like to start a discussion on FLIP-434: Support optimizations for
> > pre-partitioned data sources [1].
> >
> > The FLIP introduces taking advantage of pre-partitioned data sources for
> > SQL/Table API (it is already supported as experimental feature in
> > DataStream API [2]).
> >
> >
> > Please find more details in the FLIP wiki document [1].
> > Looking forward to your feedback.
> >
> > Regards,
> > Jeyhun
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-434%3A+Support+optimizations+for+pre-partitioned+data+sources
> > [2]
> >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/
> >
>


Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-11 Thread Jim Hughes
Hi Jeyhun,

I like the idea!  Given FLIP-376[1], I wonder if it'd make sense to
generalize FLIP-434 to be about "pre-divided" data to cover "buckets" and
"partitions" (and maybe even situations where a data source is partitioned
and bucketed).

Separate from that, the page mentions TPC-H Q1 as an example.  For a join,
any two tables joined on the same bucket key should provide a concrete
example of a join.  Systems like Kafka Streams/ksqlDB call this
"co-partitioning"; for those systems, it is a requirement placed on the
input sources.  For Flink, with FLIP-434, the proposed planner rule
could remove the shuffle.

Definitely a fun idea; I look forward to hearing more!

Cheers,

Jim


1.
https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause
2.
https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/#co-partitioning-requirements

On Sun, Mar 10, 2024 at 3:38 PM Jeyhun Karimov  wrote:

> Hi devs,
>
> I’d like to start a discussion on FLIP-434: Support optimizations for
> pre-partitioned data sources [1].
>
> The FLIP introduces taking advantage of pre-partitioned data sources for
> SQL/Table API (it is already supported as experimental feature in
> DataStream API [2]).
>
>
> Please find more details in the FLIP wiki document [1].
> Looking forward to your feedback.
>
> Regards,
> Jeyhun
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-434%3A+Support+optimizations+for+pre-partitioned+data+sources
> [2]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/
>


[DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-10 Thread Jeyhun Karimov
Hi devs,

I’d like to start a discussion on FLIP-434: Support optimizations for
pre-partitioned data sources [1].

The FLIP introduces taking advantage of pre-partitioned data sources for
SQL/Table API (it is already supported as experimental feature in
DataStream API [2]).


Please find more details in the FLIP wiki document [1].
Looking forward to your feedback.

Regards,
Jeyhun

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-434%3A+Support+optimizations+for+pre-partitioned+data+sources
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/