fsk119 edited a comment on pull request #15307:
URL: https://github.com/apache/flink/pull/15307#issuecomment-814050066


   
   > > > @fsk119 A question about the implementation:
   > > > ```
   > > > if (result.getRemainingFilters().isEmpty() && 
unconvertedPredicates.length == 0) {
   > > >   call.transformTo(newScan);
   > > > } else { .. }
   > > > ```
   > > > 
   > > > 
   > > > If there are no remaining filters, should we not still include the 
calc in the plan? Currently it's only pushing the scan.
   > > > **Edit:**
   > > > Modifying the rule to:
   > > > ```java
   > > >         SupportsFilterPushDown.Result result = 
pushdownResultWithScan._1;
   > > >         FlinkLogicalTableSourceScan newScan = 
pushdownResultWithScan._2;
   > > > 
   > > >         // build new calc program
   > > >         RexProgramBuilder programBuilder =
   > > >                 new RexProgramBuilder(newScan.getRowType(), 
call.builder().getRexBuilder());
   > > > 
   > > >         if (!result.getRemainingFilters().isEmpty()) {
   > > >             RexNode[] unconvertedPredicates = extractedPredicates._2;
   > > >             RexNode remainingCondition = getRemainingConditions(
   > > >                     relBuilder,
   > > >                     result,
   > > >                     unconvertedPredicates);
   > > > 
   > > >             programBuilder.addCondition(remainingCondition);
   > > >         }
   > > > 
   > > >         List<RexNode> projects =
   > > >                 originProgram.getProjectList().stream()
   > > >                         .map(originProgram::expandLocalRef)
   > > >                         .collect(Collectors.toList());
   > > >         List<String> outputFieldNames = 
originProgram.getOutputRowType().getFieldNames();
   > > > 
   > > >         for (int i = 0; i < projects.size(); i++) {
   > > >             programBuilder.addProject(projects.get(i), 
outputFieldNames.get(i));
   > > >         }
   > > > 
   > > >         FlinkLogicalCalc newCalc =
   > > >                 FlinkLogicalCalc.create(newScan, 
programBuilder.getProgram());
   > > >         call.transformTo(newCalc);
   > > > ```
   > > > 
   > > > 
   > > > Fixes the issue. We always push the Calc back (instead of pushing the 
scan itself in case no predicates are left), we just check whether we need to 
apply any predicates or not.
   > > 
   > > 
   > > If the calc doesn't have projection or filter, I think we can transform 
the call to scan?
   > 
   > Does a Calc not represent a projection as well as a filter?
   
   Yes.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to