KurtYoung commented on a change in pull request #12145:
URL: https://github.com/apache/flink/pull/12145#discussion_r426120399
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
##########
@@ -60,7 +60,7 @@ object FlinkBatchRuleSets {
val POST_EXPAND_CLEAN_UP_RULES: RuleSet = RuleSets.ofList(
EnumerableToLogicalTableScan.INSTANCE)
- /**
+ /**p
Review comment:
typo
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+import org.apache.flink.table.planner.plan.utils.RexNodeRewriter;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Planner rule that pushes a {@link LogicalProject} into a {@link
LogicalTableScan}
+ * which wraps a {@link SupportsProjectionPushDown} dynamic table source.
+ *
+ * <p>NOTES: This rule does not support nested fields push down now,
+ * instead it will push the top-level column down just just like non-nested
fields.
+ */
+public class PushProjectIntoTableSourceScanRule extends RelOptRule {
+ public static final PushProjectIntoTableSourceScanRule INSTANCE = new
PushProjectIntoTableSourceScanRule();
+
+ public PushProjectIntoTableSourceScanRule() {
+ super(operand(LogicalProject.class,
+ operand(LogicalTableScan.class, none())),
+ "PushProjectIntoTableSourceScanRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ LogicalTableScan scan = call.rel(1);
+ TableSourceTable tableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+ if (tableSourceTable != null && tableSourceTable.tableSource()
instanceof SupportsProjectionPushDown) {
Review comment:
check `false` condition here and we can save one level of indent
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+import org.apache.flink.table.planner.plan.utils.RexNodeRewriter;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Planner rule that pushes a {@link LogicalProject} into a {@link
LogicalTableScan}
+ * which wraps a {@link SupportsProjectionPushDown} dynamic table source.
+ *
+ * <p>NOTES: This rule does not support nested fields push down now,
+ * instead it will push the top-level column down just just like non-nested
fields.
+ */
+public class PushProjectIntoTableSourceScanRule extends RelOptRule {
+ public static final PushProjectIntoTableSourceScanRule INSTANCE = new
PushProjectIntoTableSourceScanRule();
+
+ public PushProjectIntoTableSourceScanRule() {
+ super(operand(LogicalProject.class,
+ operand(LogicalTableScan.class, none())),
+ "PushProjectIntoTableSourceScanRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ LogicalTableScan scan = call.rel(1);
+ TableSourceTable tableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+ if (tableSourceTable != null && tableSourceTable.tableSource()
instanceof SupportsProjectionPushDown) {
+ SupportsProjectionPushDown pushDownSource =
(SupportsProjectionPushDown) tableSourceTable.tableSource();
+ if (pushDownSource.supportsNestedProjection()) {
+ throw new TableException("Nested projection
push down is unsupported now. \n" +
+ "Please disable nested
projection (SupportsProjectionPushDown#supportsNestedProjection returns false),
" +
+ "planner will push down the
top-level columns.");
+ } else {
+ return true;
+ }
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ LogicalProject project = call.rel(0);
+ LogicalTableScan scan = call.rel(1);
+
+ int[] usedFields =
RexNodeExtractor.extractRefInputFields(project.getProjects());
+ // if no fields can be projected, we keep the original plan.
+ if (scan.getRowType().getFieldCount() == usedFields.length) {
+ return;
+ }
+
+ TableSourceTable oldTableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+ DynamicTableSource oldTableSource =
oldTableSourceTable.tableSource();
+
+ DynamicTableSource newTableSource = oldTableSource.copy();
Review comment:
```suggestion
DynamicTableSource newTableSource =
oldTableSourceTable.tableSource().copy();
```
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+import org.apache.flink.table.planner.plan.utils.RexNodeRewriter;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Planner rule that pushes a {@link LogicalProject} into a {@link
LogicalTableScan}
+ * which wraps a {@link SupportsProjectionPushDown} dynamic table source.
+ *
+ * <p>NOTES: This rule does not support nested fields push down now,
+ * instead it will push the top-level column down just just like non-nested
fields.
+ */
+public class PushProjectIntoTableSourceScanRule extends RelOptRule {
+ public static final PushProjectIntoTableSourceScanRule INSTANCE = new
PushProjectIntoTableSourceScanRule();
+
+ public PushProjectIntoTableSourceScanRule() {
+ super(operand(LogicalProject.class,
+ operand(LogicalTableScan.class, none())),
+ "PushProjectIntoTableSourceScanRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ LogicalTableScan scan = call.rel(1);
+ TableSourceTable tableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+ if (tableSourceTable != null && tableSourceTable.tableSource()
instanceof SupportsProjectionPushDown) {
+ SupportsProjectionPushDown pushDownSource =
(SupportsProjectionPushDown) tableSourceTable.tableSource();
+ if (pushDownSource.supportsNestedProjection()) {
+ throw new TableException("Nested projection
push down is unsupported now. \n" +
+ "Please disable nested
projection (SupportsProjectionPushDown#supportsNestedProjection returns false),
" +
+ "planner will push down the
top-level columns.");
+ } else {
+ return true;
+ }
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ LogicalProject project = call.rel(0);
+ LogicalTableScan scan = call.rel(1);
+
+ int[] usedFields =
RexNodeExtractor.extractRefInputFields(project.getProjects());
+ // if no fields can be projected, we keep the original plan.
+ if (scan.getRowType().getFieldCount() == usedFields.length) {
+ return;
+ }
+
+ TableSourceTable oldTableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+ DynamicTableSource oldTableSource =
oldTableSourceTable.tableSource();
+
+ DynamicTableSource newTableSource = oldTableSource.copy();
+ SupportsProjectionPushDown newProjectPushDownSource =
(SupportsProjectionPushDown) newTableSource;
+
+ int[][] projectedFields = new int[usedFields.length][];
+ List<String> fieldNames = new ArrayList<>();
+ for (int i = 0; i < usedFields.length; ++i) {
+ int usedField = usedFields[i];
+ projectedFields[i] = new int[] { usedField };
+
fieldNames.add(scan.getRowType().getFieldNames().get(usedField));
+ }
+ newProjectPushDownSource.applyProjection(projectedFields);
+ FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory)
oldTableSourceTable.getRelOptSchema().getTypeFactory();
+ RelDataType newRowType =
flinkTypeFactory.projectStructType(oldTableSourceTable.getRowType(),
usedFields);
+
+ String[] newExtraDigests = Arrays.copyOf(
+ oldTableSourceTable.extraDigests(),
oldTableSourceTable.extraDigests().length + 1);
+ newExtraDigests[oldTableSourceTable.extraDigests().length] =
"select(" + String.join(", ", fieldNames) + ")";
Review comment:
how about change `select` to `project`?
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+import org.apache.flink.table.planner.plan.utils.RexNodeRewriter;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Planner rule that pushes a {@link LogicalProject} into a {@link
LogicalTableScan}
+ * which wraps a {@link SupportsProjectionPushDown} dynamic table source.
+ *
+ * <p>NOTES: This rule does not support nested fields push down now,
+ * instead it will push the top-level column down just just like non-nested
fields.
+ */
+public class PushProjectIntoTableSourceScanRule extends RelOptRule {
+ public static final PushProjectIntoTableSourceScanRule INSTANCE = new
PushProjectIntoTableSourceScanRule();
+
+ public PushProjectIntoTableSourceScanRule() {
+ super(operand(LogicalProject.class,
+ operand(LogicalTableScan.class, none())),
+ "PushProjectIntoTableSourceScanRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ LogicalTableScan scan = call.rel(1);
+ TableSourceTable tableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+ if (tableSourceTable != null && tableSourceTable.tableSource()
instanceof SupportsProjectionPushDown) {
+ SupportsProjectionPushDown pushDownSource =
(SupportsProjectionPushDown) tableSourceTable.tableSource();
+ if (pushDownSource.supportsNestedProjection()) {
+ throw new TableException("Nested projection
push down is unsupported now. \n" +
+ "Please disable nested
projection (SupportsProjectionPushDown#supportsNestedProjection returns false),
" +
+ "planner will push down the
top-level columns.");
+ } else {
+ return true;
+ }
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ LogicalProject project = call.rel(0);
+ LogicalTableScan scan = call.rel(1);
+
+ int[] usedFields =
RexNodeExtractor.extractRefInputFields(project.getProjects());
+ // if no fields can be projected, we keep the original plan.
+ if (scan.getRowType().getFieldCount() == usedFields.length) {
+ return;
+ }
+
+ TableSourceTable oldTableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+ DynamicTableSource oldTableSource =
oldTableSourceTable.tableSource();
+
+ DynamicTableSource newTableSource = oldTableSource.copy();
+ SupportsProjectionPushDown newProjectPushDownSource =
(SupportsProjectionPushDown) newTableSource;
Review comment:
no need to declare this as local variable because it only been used once
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+import org.apache.flink.table.planner.plan.utils.RexNodeRewriter;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Planner rule that pushes a {@link LogicalProject} into a {@link
LogicalTableScan}
+ * which wraps a {@link SupportsProjectionPushDown} dynamic table source.
+ *
+ * <p>NOTES: This rule does not support nested fields push down now,
+ * instead it will push the top-level column down just just like non-nested
fields.
+ */
+public class PushProjectIntoTableSourceScanRule extends RelOptRule {
+ public static final PushProjectIntoTableSourceScanRule INSTANCE = new
PushProjectIntoTableSourceScanRule();
+
+ public PushProjectIntoTableSourceScanRule() {
+ super(operand(LogicalProject.class,
+ operand(LogicalTableScan.class, none())),
+ "PushProjectIntoTableSourceScanRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ LogicalTableScan scan = call.rel(1);
+ TableSourceTable tableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+ if (tableSourceTable != null && tableSourceTable.tableSource()
instanceof SupportsProjectionPushDown) {
+ SupportsProjectionPushDown pushDownSource =
(SupportsProjectionPushDown) tableSourceTable.tableSource();
+ if (pushDownSource.supportsNestedProjection()) {
+ throw new TableException("Nested projection
push down is unsupported now. \n" +
+ "Please disable nested
projection (SupportsProjectionPushDown#supportsNestedProjection returns false),
" +
+ "planner will push down the
top-level columns.");
+ } else {
+ return true;
+ }
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ LogicalProject project = call.rel(0);
+ LogicalTableScan scan = call.rel(1);
+
+ int[] usedFields =
RexNodeExtractor.extractRefInputFields(project.getProjects());
+ // if no fields can be projected, we keep the original plan.
+ if (scan.getRowType().getFieldCount() == usedFields.length) {
+ return;
+ }
+
+ TableSourceTable oldTableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+ DynamicTableSource oldTableSource =
oldTableSourceTable.tableSource();
+
+ DynamicTableSource newTableSource = oldTableSource.copy();
+ SupportsProjectionPushDown newProjectPushDownSource =
(SupportsProjectionPushDown) newTableSource;
+
+ int[][] projectedFields = new int[usedFields.length][];
+ List<String> fieldNames = new ArrayList<>();
+ for (int i = 0; i < usedFields.length; ++i) {
+ int usedField = usedFields[i];
+ projectedFields[i] = new int[] { usedField };
+
fieldNames.add(scan.getRowType().getFieldNames().get(usedField));
+ }
+ newProjectPushDownSource.applyProjection(projectedFields);
+ FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory)
oldTableSourceTable.getRelOptSchema().getTypeFactory();
+ RelDataType newRowType =
flinkTypeFactory.projectStructType(oldTableSourceTable.getRowType(),
usedFields);
+
+ String[] newExtraDigests = Arrays.copyOf(
+ oldTableSourceTable.extraDigests(),
oldTableSourceTable.extraDigests().length + 1);
+ newExtraDigests[oldTableSourceTable.extraDigests().length] =
"select(" + String.join(", ", fieldNames) + ")";
+ // project push down does not change the statistic, we can
reuse origin statistic
+ TableSourceTable newTableSourceTable = new TableSourceTable(
Review comment:
create a `copy` method just like the legacy one does
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+import org.apache.flink.table.planner.plan.utils.RexNodeRewriter;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Planner rule that pushes a {@link LogicalProject} into a {@link
LogicalTableScan}
+ * which wraps a {@link SupportsProjectionPushDown} dynamic table source.
+ *
+ * <p>NOTES: This rule does not support nested fields push down now,
+ * instead it will push the top-level column down just just like non-nested
fields.
+ */
+public class PushProjectIntoTableSourceScanRule extends RelOptRule {
+ public static final PushProjectIntoTableSourceScanRule INSTANCE = new
PushProjectIntoTableSourceScanRule();
+
+ public PushProjectIntoTableSourceScanRule() {
+ super(operand(LogicalProject.class,
+ operand(LogicalTableScan.class, none())),
+ "PushProjectIntoTableSourceScanRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ LogicalTableScan scan = call.rel(1);
+ TableSourceTable tableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+ if (tableSourceTable != null && tableSourceTable.tableSource()
instanceof SupportsProjectionPushDown) {
+ SupportsProjectionPushDown pushDownSource =
(SupportsProjectionPushDown) tableSourceTable.tableSource();
+ if (pushDownSource.supportsNestedProjection()) {
+ throw new TableException("Nested projection
push down is unsupported now. \n" +
+ "Please disable nested
projection (SupportsProjectionPushDown#supportsNestedProjection returns false),
" +
+ "planner will push down the
top-level columns.");
+ } else {
+ return true;
+ }
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ LogicalProject project = call.rel(0);
+ LogicalTableScan scan = call.rel(1);
+
+ int[] usedFields =
RexNodeExtractor.extractRefInputFields(project.getProjects());
+ // if no fields can be projected, we keep the original plan.
+ if (scan.getRowType().getFieldCount() == usedFields.length) {
+ return;
+ }
+
+ TableSourceTable oldTableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+ DynamicTableSource oldTableSource =
oldTableSourceTable.tableSource();
+
+ DynamicTableSource newTableSource = oldTableSource.copy();
+ SupportsProjectionPushDown newProjectPushDownSource =
(SupportsProjectionPushDown) newTableSource;
+
+ int[][] projectedFields = new int[usedFields.length][];
+ List<String> fieldNames = new ArrayList<>();
+ for (int i = 0; i < usedFields.length; ++i) {
+ int usedField = usedFields[i];
+ projectedFields[i] = new int[] { usedField };
+
fieldNames.add(scan.getRowType().getFieldNames().get(usedField));
+ }
+ newProjectPushDownSource.applyProjection(projectedFields);
+ FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory)
oldTableSourceTable.getRelOptSchema().getTypeFactory();
+ RelDataType newRowType =
flinkTypeFactory.projectStructType(oldTableSourceTable.getRowType(),
usedFields);
+
+ String[] newExtraDigests = Arrays.copyOf(
+ oldTableSourceTable.extraDigests(),
oldTableSourceTable.extraDigests().length + 1);
+ newExtraDigests[oldTableSourceTable.extraDigests().length] =
"select(" + String.join(", ", fieldNames) + ")";
+ // project push down does not change the statistic, we can
reuse origin statistic
+ TableSourceTable newTableSourceTable = new TableSourceTable(
+ oldTableSourceTable.getRelOptSchema(),
+ oldTableSourceTable.tableIdentifier(),
+ newRowType,
+ oldTableSourceTable.getStatistic(),
+ newTableSource,
+ oldTableSourceTable.isStreamingMode(),
+ oldTableSourceTable.catalogTable(),
+ oldTableSourceTable.dynamicOptions(),
+ newExtraDigests
+ );
+
+ LogicalTableScan newScan = new
LogicalTableScan(scan.getCluster(), scan.getTraitSet(), newTableSourceTable);
Review comment:
use a non-deprecated constructor
##########
File path:
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestProjectableValuesTableFactory.java
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.io.CollectionInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.InputFormatProvider;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.FieldsDataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.collection.Seq;
+
+/**
+ * Test implementation of {@link DynamicTableSourceFactory} that supports
projection push down.
+ */
+public class TestProjectableValuesTableFactory implements
DynamicTableSourceFactory {
+
+ //
--------------------------------------------------------------------------------------------
+ // Data Registration
+ //
--------------------------------------------------------------------------------------------
+
+ private static final AtomicInteger idCounter = new AtomicInteger(0);
+ private static final Map<String, Collection<Tuple2<RowKind, Row>>>
registeredData = new HashMap<>();
+
+ /**
+ * Register the given data into the data factory context and return the
data id.
+ * The data id can be used as a reference to the registered data in
data connector DDL.
+ */
+ public static String registerData(Collection<Row> data) {
+ List<Tuple2<RowKind, Row>> dataWithKinds = new ArrayList<>();
+ for (Row row : data) {
+ dataWithKinds.add(Tuple2.of(RowKind.INSERT, row));
+ }
+ return registerChangelogData(dataWithKinds);
+ }
+
+ /**
+ * Register the given data into the data factory context and return the
data id.
+ * The data id can be used as a reference to the registered data in
data connector DDL.
+ */
+ public static String registerData(Seq<Row> data) {
+ return registerData(JavaScalaConversionUtil.toJava(data));
+ }
+
+ /**
+ * Register the given data with RowKind into the data factory context
and return the data id.
+ * The data id can be used as a reference to the registered data in
data connector DDL.
+ * TODO: remove this utility once Row supports RowKind.
+ */
+ public static String registerChangelogData(Collection<Tuple2<RowKind,
Row>> data) {
+ String id = String.valueOf(idCounter.incrementAndGet());
+ registeredData.put(id, data);
+ return id;
+ }
+
+ /**
+ * Removes the registered data under the given data id.
+ */
+ public static void clearAllRegisteredData() {
+ registeredData.clear();
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Factory
+ //
--------------------------------------------------------------------------------------------
+
+ private static final String IDENTIFIER = "projectable-values";
+
+ private static final ConfigOption<String> DATA_ID = ConfigOptions
+ .key("data-id")
+ .stringType()
+ .defaultValue(null);
+
+ private static final ConfigOption<Boolean> BOUNDED = ConfigOptions
+ .key("bounded")
+ .booleanType()
+ .defaultValue(false);
+
+ private static final ConfigOption<String> CHANGELOG_MODE = ConfigOptions
+ .key("changelog-mode")
+ .stringType()
+ .defaultValue("I"); // all available "I,UA,UB,D"
+
+ private static final ConfigOption<String> RUNTIME_SOURCE = ConfigOptions
+ .key("runtime-source")
+ .stringType()
+ .defaultValue("SourceFunction"); // another is
"InputFormat"
+
+ private static final ConfigOption<Boolean> NESTED_PROJECTION_SUPPORTED
= ConfigOptions
+ .key("nested-projection-supported")
+ .booleanType()
+ .defaultValue(false);
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
+ helper.validate();
+ ChangelogMode changelogMode =
parseChangelogMode(helper.getOptions().get(CHANGELOG_MODE));
+ String runtimeSource = helper.getOptions().get(RUNTIME_SOURCE);
+ boolean isBounded = helper.getOptions().get(BOUNDED);
+ String dataId = helper.getOptions().get(DATA_ID);
+ boolean nestedProjectionSupported =
helper.getOptions().get(NESTED_PROJECTION_SUPPORTED);
+
+ Collection<Tuple2<RowKind, Row>> data =
registeredData.getOrDefault(dataId, Collections.emptyList());
+ DataType rowDataType =
context.getCatalogTable().getSchema().toPhysicalRowDataType();
+ return new TestProjectableValuesTableSource(
+ changelogMode,
+ isBounded,
+ runtimeSource,
+ rowDataType,
+ data,
+ nestedProjectionSupported);
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return new HashSet<>(Arrays.asList(
+ DATA_ID,
+ CHANGELOG_MODE,
+ BOUNDED,
+ RUNTIME_SOURCE,
+ NESTED_PROJECTION_SUPPORTED));
+ }
+
+ private ChangelogMode parseChangelogMode(String string) {
+ ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+ for (String split : string.split(",")) {
+ switch (split.trim()) {
+ case "I":
+
builder.addContainedKind(RowKind.INSERT);
+ break;
+ case "UB":
+
builder.addContainedKind(RowKind.UPDATE_BEFORE);
+ break;
+ case "UA":
+
builder.addContainedKind(RowKind.UPDATE_AFTER);
+ break;
+ case "D":
+
builder.addContainedKind(RowKind.DELETE);
+ break;
+ default:
+ throw new
IllegalArgumentException("Invalid ChangelogMode string: " + string);
+ }
+ }
+ return builder.build();
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Table source
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Values {@link DynamicTableSource} for testing.
+ */
+ private static class TestProjectableValuesTableSource implements
ScanTableSource, SupportsProjectionPushDown {
+
+ private final ChangelogMode changelogMode;
+ private final boolean bounded;
+ private final String runtimeSource;
+ private DataType physicalRowDataType;
+ private final Collection<Tuple2<RowKind, Row>> data;
+ private final boolean nestedProjectionSupported;
+ private int[] projectedFields = null;
+
+ private TestProjectableValuesTableSource(
+ ChangelogMode changelogMode,
+ boolean bounded, String runtimeSource,
+ DataType physicalRowDataType,
+ Collection<Tuple2<RowKind, Row>> data,
+ boolean nestedProjectionSupported) {
+ this.changelogMode = changelogMode;
+ this.bounded = bounded;
+ this.runtimeSource = runtimeSource;
+ this.physicalRowDataType = physicalRowDataType;
+ this.data = data;
+ this.nestedProjectionSupported =
nestedProjectionSupported;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return changelogMode;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public ScanRuntimeProvider
getScanRuntimeProvider(ScanTableSource.Context runtimeProviderContext) {
+ TypeSerializer<RowData> serializer =
(TypeSerializer<RowData>) runtimeProviderContext
+
.createTypeInformation(physicalRowDataType)
+ .createSerializer(new
ExecutionConfig());
+ DataStructureConverter converter =
runtimeProviderContext.createDataStructureConverter(physicalRowDataType);
+ Collection<RowData> values = convertToRowData(data,
projectedFields, converter);
+
+ if (runtimeSource.equals("SourceFunction")) {
+ try {
+ return SourceFunctionProvider.of(
+ new
FromElementsFunction<>(serializer, values),
+ bounded);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ } else if (runtimeSource.equals("InputFormat")) {
+ return InputFormatProvider.of(new
CollectionInputFormat<>(values, serializer));
+ } else {
+ throw new IllegalArgumentException("Unsupported
runtime source class: " + runtimeSource);
+ }
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ TestProjectableValuesTableSource newTableSource = new
TestProjectableValuesTableSource(
+ changelogMode, bounded, runtimeSource,
physicalRowDataType, data, nestedProjectionSupported);
+ newTableSource.projectedFields = projectedFields;
+ return newTableSource;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "TestProjectableValues";
+ }
+
+ private static Collection<RowData> convertToRowData(
+ Collection<Tuple2<RowKind, Row>> data,
+ @Nullable int[] projectedFields,
+ DataStructureConverter converter) {
+ List<RowData> result = new ArrayList<>();
+ for (Tuple2<RowKind, Row> value : data) {
+ Row projectedRow;
+ if (projectedFields == null) {
+ projectedRow = value.f1;
+ } else {
+ Object[] newValues = new
Object[projectedFields.length];
+ for (int i = 0; i <
projectedFields.length; ++i) {
+ newValues[i] =
value.f1.getField(projectedFields[i]);
+ }
+ projectedRow = Row.of(newValues);
+ }
+ RowData rowData = (RowData)
converter.toInternal(projectedRow);
+ if (rowData != null) {
+ rowData.setRowKind(value.f0);
+ result.add(rowData);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public boolean supportsNestedProjection() {
+ return nestedProjectionSupported;
+ }
+
+ @Override
+ public void applyProjection(int[][] projectedFields) {
+ this.projectedFields = new int[projectedFields.length];
+ FieldsDataType dataType = (FieldsDataType)
physicalRowDataType;
+ RowType rowType = ((RowType)
physicalRowDataType.getLogicalType());
+ DataTypes.Field[] fields = new
DataTypes.Field[projectedFields.length];
+ for (int i = 0; i < projectedFields.length; ++i) {
+ int[] projection = projectedFields[i];
+ Preconditions.checkArgument(projection.length
== 1);
+ int index = projection[0];
+ this.projectedFields[i] = index;
+ String fieldName =
rowType.getFieldNames().get(index);
+ fields[i] = DataTypes.FIELD(fieldName,
dataType.getFieldDataTypes().get(fieldName));
Review comment:
`getFieldDataTypes` has been deleted
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]