Hi, I agree with Becket that we may need to extend FieldReferenceExpression to support nested field access (or maybe a new NestedFieldReferenceExpression). But I have some concerns about evolving the SupportsProjectionPushDown.applyProjection. A projection is much simpler than Filter Expression which only needs to represent the field indexes. If we evolve `applyProjection` to accept `List<FieldReferenceExpression> projectedFields`, users have to convert the `List<FieldReferenceExpression>` back to int[][] which is an overhead for users. Field indexes (int[][]) is required to project schemas with the utility org.apache.flink.table.connector.Projection.
Best, Jark On Wed, 2 Aug 2023 at 07:40, Venkatakrishnan Sowrirajan <vsowr...@asu.edu> wrote: > Thanks Becket for the suggestion. That makes sense. Let me try it out and > get back to you. > > Regards > Venkata krishnan > > > On Tue, Aug 1, 2023 at 9:04 AM Becket Qin <becket....@gmail.com> wrote: > > > This is a very useful feature in practice. > > > > It looks to me that the key issue here is that Flink ResolvedExpression > > does not have necessary abstraction for nested field access. So the > Calcite > > RexFieldAccess does not have a counterpart in the ResolvedExpression. The > > FieldReferenceExpression only supports direct access to the fields, not > > nested access. > > > > Theoretically speaking, this nested field reference is also required by > > projection pushdown. However, we addressed that by using an int[][] in > the > > SupportsProjectionPushDown interface. Maybe we can do the following: > > > > 1. Extend the FieldReferenceExpression to include an int[] for nested > field > > access, > > 2. By doing (1), > > SupportsFilterPushDown#applyFilters(List<ResolvedExpression>) can support > > nested field access. > > 3. Evolve the SupportsProjectionPushDown.applyProjection(int[][] > > projectedFields, DataType producedDataType) to > > applyProjection(List<FieldReferenceExpression> projectedFields, DataType > > producedDataType) > > > > This will need a FLIP. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Tue, Aug 1, 2023 at 11:42 PM Venkatakrishnan Sowrirajan < > > vsowr...@asu.edu> > > wrote: > > > > > Thanks for the response. Looking forward to your pointers. In the > > > meanwhile, let me figure out how we can implement it. Will keep you > > posted. > > > > > > On Mon, Jul 31, 2023, 11:43 PM liu ron <ron9....@gmail.com> wrote: > > > > > > > Hi, Venkata > > > > > > > > Thanks for reporting this issue. Currently, Flink doesn't support > > nested > > > > filter pushdown. I also think that this optimization would be useful, > > > > especially for jobs, which may need to read a lot of data from the > > > parquet > > > > or orc file. We didn't move forward with this for some priority > > reasons. > > > > > > > > Regarding your three questions, I will respond to you later after my > > > > on-call is finished because I need to dive into the source code. > About > > > your > > > > commit, I don't think it's the right solution because > > > > FieldReferenceExpression doesn't currently support nested field > filter > > > > pushdown, maybe we need to extend it. > > > > > > > > You can also look further into reasonable solutions, which we'll > > discuss > > > > further later on. > > > > > > > > Best, > > > > Ron > > > > > > > > > > > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2023年7月29日周六 03:31写道: > > > > > > > > > Hi all, > > > > > > > > > > Currently, I am working on adding support for nested fields filter > > push > > > > > down. In our use case running Flink on Batch, we found nested > fields > > > > filter > > > > > push down is key - without it, it is significantly slow. Note: > Spark > > > SQL > > > > > supports nested fields filter push down. > > > > > > > > > > While debugging the code using IcebergTableSource as the table > > source, > > > > > narrowed down the issue to missing support for > > > > > RexNodeExtractor#RexNodeToExpressionConverter#visitFieldAccess. > > > > > As part of fixing it, I made changes by returning an > > > > > Option(FieldReferenceExpression) > > > > > with appropriate reference to the parent index and the child index > > for > > > > the > > > > > nested field with the data type info. > > > > > > > > > > But this new ResolvedExpression cannot be converted to RexNode > which > > > > > happens in PushFilterIntoSourceScanRuleBase > > > > > < > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://github.com/apache/flink/blob/3f63e03e83144e9857834f8db1895637d2aa218a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java*L104__;Iw!!IKRxdwAv5BmarQ!fNgxcul8ZGwkNE9ygOeVGlWlU6m_MLMXf4A3S3oQu9LBzYTPF90pZ7uXSGMr-5dFmzRn37-e9Q5cMnVs$ > > > > > > > > > > > . > > > > > > > > > > Few questions > > > > > > > > > > 1. Does FieldReferenceExpression support nested fields currently or > > > > should > > > > > it be extended to support nested fields? I couldn't figure this out > > > from > > > > > the PushProjectIntoTableScanRule that supports nested column > > projection > > > > > push down. > > > > > 2. ExpressionConverter > > > > > < > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://github.com/apache/flink/blob/3f63e03e83144e9857834f8db1895637d2aa218a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java*L197__;Iw!!IKRxdwAv5BmarQ!fNgxcul8ZGwkNE9ygOeVGlWlU6m_MLMXf4A3S3oQu9LBzYTPF90pZ7uXSGMr-5dFmzRn37-e9Z6jnkJm$ > > > > > > > > > > > converts ResolvedExpression -> RexNode but the new > > > > FieldReferenceExpression > > > > > with the nested field cannot be converted to RexNode. This is why > the > > > > > answer to the 1st question is key. > > > > > 3. Anything else that I'm missing here? or is there an even easier > > way > > > to > > > > > add support for nested fields filter push down? > > > > > > > > > > Partially working changes - Commit > > > > > < > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://github.com/venkata91/flink/commit/00cdf34ecf9be3ba669a97baaed4b69b85cd26f9__;!!IKRxdwAv5BmarQ!fNgxcul8ZGwkNE9ygOeVGlWlU6m_MLMXf4A3S3oQu9LBzYTPF90pZ7uXSGMr-5dFmzRn37-e9XeOjJ_a$ > > > > > > > > > > > Please > > > > > feel free to leave a comment directly in the commit. > > > > > > > > > > Any pointers here would be much appreciated! Thanks in advance. > > > > > > > > > > Disclaimer: Relatively new to Flink code base especially Table > > planner > > > > :-). > > > > > > > > > > Regards > > > > > Venkata krishnan > > > > > > > > > > > > > > >