[ 
https://issues.apache.org/jira/browse/BEAM-8468?focusedWorklogId=335232&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-335232
 ]

ASF GitHub Bot logged work on BEAM-8468:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Oct/19 22:16
            Start Date: 28/Oct/19 22:16
    Worklog Time Spent: 10m 
      Work Description: apilloud commented on pull request #9863: [BEAM-8468] 
Predicate push down for in memory table
URL: https://github.com/apache/beam/pull/9863#discussion_r339810820
 
 

 ##########
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOPushDownRule.java
 ##########
 @@ -88,37 +86,66 @@ public void onMatch(RelOptRuleCall call) {
     final Pair<ImmutableList<RexNode>, ImmutableList<RexNode>> projectFilter = 
program.split();
     final RelDataType calcInputRowType = program.getInputRowType();
 
+    final BeamSqlTableFilter tableFilter = 
beamSqlTable.constructFilter(projectFilter.right);
+    if (!beamSqlTable.supportsProjects() && tableFilter instanceof 
DefaultTableFilter) {
+      // Either project or filter push-down must be supported by the IO.
+      return;
+    }
+
+    if (!(tableFilter instanceof DefaultTableFilter) && 
!beamSqlTable.supportsProjects()) {
+      // Filter push-down without project push-down is not supported for now.
+      return;
+    }
+
     // Find all input refs used by projects
-    Set<String> usedFields = new HashSet<>();
+    boolean hasComplexProjects = false;
+    Set<String> usedFields = new LinkedHashSet<>();
     for (RexNode project : projectFilter.left) {
       findUtilizedInputRefs(calcInputRowType, project, usedFields);
+      if (!hasComplexProjects && project instanceof RexCall) {
+        // Ex: 'SELECT field+10 FROM table'
+        hasComplexProjects = true;
+      }
     }
 
     // Find all input refs used by filters
-    for (RexNode filter : projectFilter.right) {
+    for (RexNode filter : tableFilter.getNotSupported()) {
       findUtilizedInputRefs(calcInputRowType, filter, usedFields);
     }
 
     FieldAccessDescriptor resolved =
-        
FieldAccessDescriptor.withFieldNames(usedFields).resolve(beamSqlTable.getSchema());
+        FieldAccessDescriptor.withFieldNames(usedFields)
+            .withOrderByFieldInsertionOrder()
+            .resolve(beamSqlTable.getSchema());
     Schema newSchema =
         
SelectHelpers.getOutputSchema(ioSourceRel.getBeamSqlTable().getSchema(), 
resolved);
     RelDataType calcInputType =
         CalciteUtils.toCalciteRowType(newSchema, 
ioSourceRel.getCluster().getTypeFactory());
 
-    // Check if the calc can be dropped
-    if (isProjectRenameOnlyProgram(program)) {
-      call.transformTo(ioSourceRel.copy(calc.getRowType(), 
newSchema.getFieldNames()));
+    // Check if the calc can be dropped:
+    // 1. There are no complex projects.
+    // 2. And
+    // 2.1. Calc only does projects and renames.
+    //      Or
+    // 2.2. Predicate can be completely pushed-down to IO level.
+    if (!hasComplexProjects
 
 Review comment:
   This seems a little complex and possibly wrong. I think it should be:
   1. Projects are rename only, and
   2. Predicate can be completely pushed-down.
   
   To put this in code: `isProjectRenameOnly(program) && 
tableFilter.getNotSupported().isEmpty()`, where `isProjectRenameOnly` is 
`isProjectRenameOnlyProgram` without the condition check.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 335232)
    Time Spent: 50m  (was: 40m)

> Add predicate/filter push-down capability to IO APIs
> ----------------------------------------------------
>
>                 Key: BEAM-8468
>                 URL: https://issues.apache.org/jira/browse/BEAM-8468
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql
>            Reporter: Kirill Kozlov
>            Assignee: Kirill Kozlov
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> * InMemoryTable should implement a following methods:
> {code:java}
> public PCollection<Row> buildIOReader(
>     PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames);
> public BeamSqlTableFilter constructFilter(List<RexNode> filter);
> {code}
>  * Update a push-down rule to support predicate/filter push-down.
>  * Create a class
> {code:java}
> class TestTableFilter implements BeamSqlTableFilter{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to