This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 71e64989ee0b7109271ced5e1af03cc3694783e9 Author: Jark Wu <[email protected]> AuthorDate: Mon May 18 17:40:17 2020 +0800 [hotfix][table] Improve testing implementation for the new projection push down --- .../apache/flink/table/utils/TableSchemaUtils.java | 20 ++ .../flink/table/utils/TableSchemaUtilsTest.java | 48 +++ .../TestProjectableValuesTableFactory.java | 326 --------------------- .../planner/factories/TestValuesTableFactory.java | 79 ++++- .../PushProjectIntoTableSourceScanRuleTest.java | 6 +- .../org.apache.flink.table.factories.Factory | 1 - .../planner/plan/stream/sql/TableScanTest.xml | 2 +- .../planner/plan/batch/sql/TableSourceTest.scala | 4 +- .../planner/plan/stream/sql/TableSourceTest.scala | 20 +- .../plan/stream/table/TableSourceTest.scala | 20 +- .../planner/runtime/batch/sql/CalcITCase.scala | 11 +- .../planner/runtime/stream/sql/CalcITCase.scala | 10 +- .../planner/runtime/utils/BatchTestBase.scala | 3 +- .../planner/runtime/utils/StreamingTestBase.scala | 5 +- 14 files changed, 170 insertions(+), 385 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java index 4863cb2..67ee125 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java @@ -32,6 +32,8 @@ import org.apache.flink.util.Preconditions; import java.util.List; import java.util.Optional; +import static org.apache.flink.util.Preconditions.checkArgument; + /** * Utilities to {@link TableSchema}. */ @@ -62,6 +64,24 @@ public class TableSchemaUtils { } /** + * Creates a new {@link TableSchema} with the projected fields from another {@link TableSchema}. + * The new {@link TableSchema} doesn't contain any primary key or watermark information. + * + * @see org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown + */ + public static TableSchema projectSchema(TableSchema tableSchema, int[][] projectedFields) { + checkArgument(!containsGeneratedColumns(tableSchema), "It's illegal to project on a schema contains computed columns."); + TableSchema.Builder schemaBuilder = TableSchema.builder(); + List<TableColumn> tableColumns = tableSchema.getTableColumns(); + for (int[] fieldPath : projectedFields) { + checkArgument(fieldPath.length == 1, "Nested projection push down is not supported yet."); + TableColumn column = tableColumns.get(fieldPath[0]); + schemaBuilder.field(column.getName(), column.getType()); + } + return schemaBuilder.build(); + } + + /** * Returns true if there are any generated columns in the given {@link TableColumn}. */ public static boolean containsGeneratedColumns(TableSchema schema) { diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java index e96ddd9..3e760b1 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java @@ -70,4 +70,52 @@ public class TableSchemaUtilsTest { exceptionRule.expectMessage("Constraint ct2 to drop does not exist"); TableSchemaUtils.dropConstraint(oriSchema, "ct2"); } + + @Test + public void testInvalidProjectSchema() { + { + TableSchema schema = TableSchema.builder() + .field("a", DataTypes.INT().notNull()) + .field("b", DataTypes.STRING()) + .field("c", DataTypes.INT(), "a + 1") + .field("t", DataTypes.TIMESTAMP(3)) + .primaryKey("ct1", new String[]{"a"}) + .watermark("t", "t", DataTypes.TIMESTAMP(3)) + .build(); + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("It's illegal to project on a schema contains computed columns."); + int[][] projectedFields = {{1}}; + TableSchemaUtils.projectSchema(schema, projectedFields); + } + + { + TableSchema schema = TableSchema.builder() + .field("a", DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.STRING()))) + .field("b", DataTypes.STRING()) + .build(); + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("Nested projection push down is not supported yet."); + int[][] projectedFields = {{0, 1}}; + TableSchemaUtils.projectSchema(schema, projectedFields); + } + } + + @Test + public void testProjectSchema() { + TableSchema schema = TableSchema.builder() + .field("a", DataTypes.INT().notNull()) + .field("b", DataTypes.STRING()) + .field("t", DataTypes.TIMESTAMP(3)) + .primaryKey("a") + .watermark("t", "t", DataTypes.TIMESTAMP(3)) + .build(); + + int[][] projectedFields = {{2}, {0}}; + TableSchema projected = TableSchemaUtils.projectSchema(schema, projectedFields); + TableSchema expected = TableSchema.builder() + .field("t", DataTypes.TIMESTAMP(3)) + .field("a", DataTypes.INT().notNull()) + .build(); + assertEquals(expected, projected); + } } diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestProjectableValuesTableFactory.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestProjectableValuesTableFactory.java deleted file mode 100644 index c5367ea..0000000 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestProjectableValuesTableFactory.java +++ /dev/null @@ -1,326 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.factories; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.io.CollectionInputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; -import org.apache.flink.streaming.api.functions.source.FromElementsFunction; -import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.connector.ChangelogMode; -import org.apache.flink.table.connector.source.DynamicTableSource; -import org.apache.flink.table.connector.source.InputFormatProvider; -import org.apache.flink.table.connector.source.ScanTableSource; -import org.apache.flink.table.connector.source.SourceFunctionProvider; -import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.factories.DynamicTableSourceFactory; -import org.apache.flink.table.factories.FactoryUtil; -import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; -import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.FieldsDataType; -import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.types.Row; -import org.apache.flink.types.RowKind; -import org.apache.flink.util.Preconditions; - -import javax.annotation.Nullable; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; - -import scala.collection.Seq; - -/** - * Test implementation of {@link DynamicTableSourceFactory} that supports projection push down. - */ -public class TestProjectableValuesTableFactory implements DynamicTableSourceFactory { - - // -------------------------------------------------------------------------------------------- - // Data Registration - // -------------------------------------------------------------------------------------------- - - private static final AtomicInteger idCounter = new AtomicInteger(0); - private static final Map<String, Collection<Tuple2<RowKind, Row>>> registeredData = new HashMap<>(); - - /** - * Register the given data into the data factory context and return the data id. - * The data id can be used as a reference to the registered data in data connector DDL. - */ - public static String registerData(Collection<Row> data) { - List<Tuple2<RowKind, Row>> dataWithKinds = new ArrayList<>(); - for (Row row : data) { - dataWithKinds.add(Tuple2.of(RowKind.INSERT, row)); - } - return registerChangelogData(dataWithKinds); - } - - /** - * Register the given data into the data factory context and return the data id. - * The data id can be used as a reference to the registered data in data connector DDL. - */ - public static String registerData(Seq<Row> data) { - return registerData(JavaScalaConversionUtil.toJava(data)); - } - - /** - * Register the given data with RowKind into the data factory context and return the data id. - * The data id can be used as a reference to the registered data in data connector DDL. - * TODO: remove this utility once Row supports RowKind. - */ - public static String registerChangelogData(Collection<Tuple2<RowKind, Row>> data) { - String id = String.valueOf(idCounter.incrementAndGet()); - registeredData.put(id, data); - return id; - } - - /** - * Removes the registered data under the given data id. - */ - public static void clearAllRegisteredData() { - registeredData.clear(); - } - - // -------------------------------------------------------------------------------------------- - // Factory - // -------------------------------------------------------------------------------------------- - - private static final String IDENTIFIER = "projectable-values"; - - private static final ConfigOption<String> DATA_ID = ConfigOptions - .key("data-id") - .stringType() - .defaultValue(null); - - private static final ConfigOption<Boolean> BOUNDED = ConfigOptions - .key("bounded") - .booleanType() - .defaultValue(false); - - private static final ConfigOption<String> CHANGELOG_MODE = ConfigOptions - .key("changelog-mode") - .stringType() - .defaultValue("I"); // all available "I,UA,UB,D" - - private static final ConfigOption<String> RUNTIME_SOURCE = ConfigOptions - .key("runtime-source") - .stringType() - .defaultValue("SourceFunction"); // another is "InputFormat" - - private static final ConfigOption<Boolean> NESTED_PROJECTION_SUPPORTED = ConfigOptions - .key("nested-projection-supported") - .booleanType() - .defaultValue(false); - - @Override - public String factoryIdentifier() { - return IDENTIFIER; - } - - @Override - public DynamicTableSource createDynamicTableSource(Context context) { - FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); - helper.validate(); - ChangelogMode changelogMode = parseChangelogMode(helper.getOptions().get(CHANGELOG_MODE)); - String runtimeSource = helper.getOptions().get(RUNTIME_SOURCE); - boolean isBounded = helper.getOptions().get(BOUNDED); - String dataId = helper.getOptions().get(DATA_ID); - boolean nestedProjectionSupported = helper.getOptions().get(NESTED_PROJECTION_SUPPORTED); - - Collection<Tuple2<RowKind, Row>> data = registeredData.getOrDefault(dataId, Collections.emptyList()); - DataType rowDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType(); - return new TestProjectableValuesTableSource( - changelogMode, - isBounded, - runtimeSource, - rowDataType, - data, - nestedProjectionSupported); - } - - @Override - public Set<ConfigOption<?>> requiredOptions() { - return Collections.emptySet(); - } - - @Override - public Set<ConfigOption<?>> optionalOptions() { - return new HashSet<>(Arrays.asList( - DATA_ID, - CHANGELOG_MODE, - BOUNDED, - RUNTIME_SOURCE, - NESTED_PROJECTION_SUPPORTED)); - } - - private ChangelogMode parseChangelogMode(String string) { - ChangelogMode.Builder builder = ChangelogMode.newBuilder(); - for (String split : string.split(",")) { - switch (split.trim()) { - case "I": - builder.addContainedKind(RowKind.INSERT); - break; - case "UB": - builder.addContainedKind(RowKind.UPDATE_BEFORE); - break; - case "UA": - builder.addContainedKind(RowKind.UPDATE_AFTER); - break; - case "D": - builder.addContainedKind(RowKind.DELETE); - break; - default: - throw new IllegalArgumentException("Invalid ChangelogMode string: " + string); - } - } - return builder.build(); - } - - // -------------------------------------------------------------------------------------------- - // Table source - // -------------------------------------------------------------------------------------------- - - /** - * Values {@link DynamicTableSource} for testing. - */ - private static class TestProjectableValuesTableSource implements ScanTableSource, SupportsProjectionPushDown { - - private final ChangelogMode changelogMode; - private final boolean bounded; - private final String runtimeSource; - private DataType physicalRowDataType; - private final Collection<Tuple2<RowKind, Row>> data; - private final boolean nestedProjectionSupported; - private int[] projectedFields = null; - - private TestProjectableValuesTableSource( - ChangelogMode changelogMode, - boolean bounded, String runtimeSource, - DataType physicalRowDataType, - Collection<Tuple2<RowKind, Row>> data, - boolean nestedProjectionSupported) { - this.changelogMode = changelogMode; - this.bounded = bounded; - this.runtimeSource = runtimeSource; - this.physicalRowDataType = physicalRowDataType; - this.data = data; - this.nestedProjectionSupported = nestedProjectionSupported; - } - - @Override - public ChangelogMode getChangelogMode() { - return changelogMode; - } - - @SuppressWarnings("unchecked") - @Override - public ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.Context runtimeProviderContext) { - TypeSerializer<RowData> serializer = (TypeSerializer<RowData>) runtimeProviderContext - .createTypeInformation(physicalRowDataType) - .createSerializer(new ExecutionConfig()); - DataStructureConverter converter = runtimeProviderContext.createDataStructureConverter(physicalRowDataType); - Collection<RowData> values = convertToRowData(data, projectedFields, converter); - - if (runtimeSource.equals("SourceFunction")) { - try { - return SourceFunctionProvider.of( - new FromElementsFunction<>(serializer, values), - bounded); - } catch (IOException e) { - throw new RuntimeException(e); - } - } else if (runtimeSource.equals("InputFormat")) { - return InputFormatProvider.of(new CollectionInputFormat<>(values, serializer)); - } else { - throw new IllegalArgumentException("Unsupported runtime source class: " + runtimeSource); - } - } - - @Override - public DynamicTableSource copy() { - TestProjectableValuesTableSource newTableSource = new TestProjectableValuesTableSource( - changelogMode, bounded, runtimeSource, physicalRowDataType, data, nestedProjectionSupported); - newTableSource.projectedFields = projectedFields; - return newTableSource; - } - - @Override - public String asSummaryString() { - return "TestProjectableValues"; - } - - private static Collection<RowData> convertToRowData( - Collection<Tuple2<RowKind, Row>> data, - @Nullable int[] projectedFields, - DataStructureConverter converter) { - List<RowData> result = new ArrayList<>(); - for (Tuple2<RowKind, Row> value : data) { - Row projectedRow; - if (projectedFields == null) { - projectedRow = value.f1; - } else { - Object[] newValues = new Object[projectedFields.length]; - for (int i = 0; i < projectedFields.length; ++i) { - newValues[i] = value.f1.getField(projectedFields[i]); - } - projectedRow = Row.of(newValues); - } - RowData rowData = (RowData) converter.toInternal(projectedRow); - if (rowData != null) { - rowData.setRowKind(value.f0); - result.add(rowData); - } - } - return result; - } - - @Override - public boolean supportsNestedProjection() { - return nestedProjectionSupported; - } - - @Override - public void applyProjection(int[][] projectedFields) { - this.projectedFields = new int[projectedFields.length]; - FieldsDataType dataType = (FieldsDataType) physicalRowDataType; - RowType rowType = ((RowType) physicalRowDataType.getLogicalType()); - DataTypes.Field[] fields = new DataTypes.Field[projectedFields.length]; - for (int i = 0; i < projectedFields.length; ++i) { - int[] projection = projectedFields[i]; - Preconditions.checkArgument(projection.length == 1); - int index = projection[0]; - this.projectedFields[i] = index; - fields[i] = DataTypes.FIELD(rowType.getFieldNames().get(index), dataType.getChildren().get(index)); - } - this.physicalRowDataType = DataTypes.ROW(fields); - } - } -} diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java index e889814..47dcf42 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java @@ -40,6 +40,7 @@ import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.SourceFunctionProvider; import org.apache.flink.table.connector.source.TableFunctionProvider; import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; +import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.data.RowData; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.factories.DynamicTableSinkFactory; @@ -55,7 +56,6 @@ import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.Retra import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.TestValuesLookupFunction; import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; -import org.apache.flink.table.types.DataType; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; @@ -235,6 +235,11 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory, .booleanType() .defaultValue(true); + private static final ConfigOption<Boolean> NESTED_PROJECTION_SUPPORTED = ConfigOptions + .key("nested-projection-supported") + .booleanType() + .defaultValue(false); + @Override public String factoryIdentifier() { return IDENTIFIER; @@ -251,18 +256,21 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory, String sourceClass = helper.getOptions().get(TABLE_SOURCE_CLASS); boolean isAsync = helper.getOptions().get(ASYNC_ENABLED); String lookupFunctionClass = helper.getOptions().get(LOOKUP_FUNCTION_CLASS); + boolean nestedProjectionSupported = helper.getOptions().get(NESTED_PROJECTION_SUPPORTED); if (sourceClass.equals("DEFAULT")) { Collection<Tuple2<RowKind, Row>> data = registeredData.getOrDefault(dataId, Collections.emptyList()); - DataType rowDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType(); + TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); return new TestValuesTableSource( + physicalSchema, changelogMode, isBounded, runtimeSource, - rowDataType, data, isAsync, - lookupFunctionClass); + lookupFunctionClass, + nestedProjectionSupported, + null); } else { try { return InstantiationUtil.instantiate( @@ -306,7 +314,8 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory, ASYNC_ENABLED, TABLE_SOURCE_CLASS, SINK_INSERT_ONLY, - RUNTIME_SINK)); + RUNTIME_SINK, + NESTED_PROJECTION_SUPPORTED)); } private ChangelogMode parseChangelogMode(String string) { @@ -339,30 +348,37 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory, /** * Values {@link DynamicTableSource} for testing. */ - private static class TestValuesTableSource implements ScanTableSource, LookupTableSource { + private static class TestValuesTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown { + private TableSchema physicalSchema; private final ChangelogMode changelogMode; private final boolean bounded; private final String runtimeSource; - private final DataType physicalRowDataType; private final Collection<Tuple2<RowKind, Row>> data; private final boolean isAsync; private final @Nullable String lookupFunctionClass; + private final boolean nestedProjectionSupported; + private @Nullable int[] projectedFields; private TestValuesTableSource( + TableSchema physicalSchema, ChangelogMode changelogMode, - boolean bounded, String runtimeSource, - DataType physicalRowDataType, + boolean bounded, + String runtimeSource, Collection<Tuple2<RowKind, Row>> data, boolean isAsync, - @Nullable String lookupFunctionClass) { + @Nullable String lookupFunctionClass, + boolean nestedProjectionSupported, + int[] projectedFields) { + this.physicalSchema = physicalSchema; this.changelogMode = changelogMode; this.bounded = bounded; this.runtimeSource = runtimeSource; - this.physicalRowDataType = physicalRowDataType; this.data = data; this.isAsync = isAsync; this.lookupFunctionClass = lookupFunctionClass; + this.nestedProjectionSupported = nestedProjectionSupported; + this.projectedFields = projectedFields; } @Override @@ -374,11 +390,11 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory, @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.Context runtimeProviderContext) { TypeSerializer<RowData> serializer = (TypeSerializer<RowData>) runtimeProviderContext - .createTypeInformation(physicalRowDataType) + .createTypeInformation(physicalSchema.toRowDataType()) .createSerializer(new ExecutionConfig()); - DataStructureConverter converter = runtimeProviderContext.createDataStructureConverter(physicalRowDataType); + DataStructureConverter converter = runtimeProviderContext.createDataStructureConverter(physicalSchema.toRowDataType()); converter.open(RuntimeConverter.Context.create(TestValuesTableFactory.class.getClassLoader())); - Collection<RowData> values = convertToRowData(data, converter); + Collection<RowData> values = convertToRowData(data, projectedFields, converter); if (runtimeSource.equals("SourceFunction")) { try { @@ -438,8 +454,28 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory, } @Override + public boolean supportsNestedProjection() { + return nestedProjectionSupported; + } + + @Override + public void applyProjection(int[][] projectedFields) { + this.physicalSchema = TableSchemaUtils.projectSchema(physicalSchema, projectedFields); + this.projectedFields = Arrays.stream(projectedFields).mapToInt(f -> f[0]).toArray(); + } + + @Override public DynamicTableSource copy() { - return new TestValuesTableSource(changelogMode, bounded, runtimeSource, physicalRowDataType, data, isAsync, lookupFunctionClass); + return new TestValuesTableSource( + physicalSchema, + changelogMode, + bounded, + runtimeSource, + data, + isAsync, + lookupFunctionClass, + nestedProjectionSupported, + projectedFields); } @Override @@ -449,10 +485,21 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory, private static Collection<RowData> convertToRowData( Collection<Tuple2<RowKind, Row>> data, + int[] projectedFields, DataStructureConverter converter) { List<RowData> result = new ArrayList<>(); for (Tuple2<RowKind, Row> value : data) { - RowData rowData = (RowData) converter.toInternal(value.f1); + Row projectedRow; + if (projectedFields == null) { + projectedRow = value.f1; + } else { + Object[] newValues = new Object[projectedFields.length]; + for (int i = 0; i < projectedFields.length; ++i) { + newValues[i] = value.f1.getField(projectedFields[i]); + } + projectedRow = Row.of(newValues); + } + RowData rowData = (RowData) converter.toInternal(projectedRow); if (rowData != null) { rowData.setRowKind(value.f0); result.add(rowData); diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java index 955e3d4..b8a5b0b 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java @@ -54,7 +54,7 @@ public class PushProjectIntoTableSourceScanRuleTest extends PushProjectIntoLegac " b bigint,\n" + " c string\n" + ") WITH (\n" + - " 'connector' = 'projectable-values',\n" + + " 'connector' = 'values',\n" + " 'bounded' = 'true'\n" + ")"; util().tableEnv().executeSql(ddl1); @@ -66,7 +66,7 @@ public class PushProjectIntoTableSourceScanRuleTest extends PushProjectIntoLegac " c string,\n" + " d as a + 1\n" + ") WITH (\n" + - " 'connector' = 'projectable-values',\n" + + " 'connector' = 'values',\n" + " 'bounded' = 'true'\n" + ")"; util().tableEnv().executeSql(ddl2); @@ -92,7 +92,7 @@ public class PushProjectIntoTableSourceScanRuleTest extends PushProjectIntoLegac " nested row<name string, `value` int>,\n" + " name string\n" + ") WITH (\n" + - " 'connector' = 'projectable-values',\n" + + " 'connector' = 'values',\n" + " 'nested-projection-supported' = '" + nestedProjectionSupported + "',\n" + " 'bounded' = 'true'\n" + ")"; diff --git a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory index 7632f4b..498fb98 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -14,5 +14,4 @@ # limitations under the License. org.apache.flink.table.planner.factories.TestValuesTableFactory -org.apache.flink.table.planner.factories.TestProjectableValuesTableFactory org.apache.flink.table.planner.utils.TestCsvFileSystemFormatFactory diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml index 9c08f74..5e8c3fe 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml @@ -55,7 +55,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]) GroupAggregate(select=[COUNT_RETRACT(*) AS EXPR$0], changelogMode=[I,UA,D]) +- Exchange(distribution=[single], changelogMode=[I,UB,UA]) +- Calc(select=[0 AS $f0], where=[>(a, 1)], changelogMode=[I,UB,UA]) - +- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[ts, a, b], changelogMode=[I,UB,UA]) + +- TableSourceScan(table=[[default_catalog, default_database, src, project=[a]]], fields=[a], changelogMode=[I,UB,UA]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala index 7b12fcf..9699a05 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala @@ -35,7 +35,7 @@ class TableSourceTest extends TableTestBase { | b bigint, | c varchar(32) |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'bounded' = 'true' |) """.stripMargin @@ -49,7 +49,7 @@ class TableSourceTest extends TableTestBase { | nested row<name string, `value` int>, | name string |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'bounded' = 'true' |) |""".stripMargin diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala index 5f15680..b5f252b 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala @@ -37,7 +37,7 @@ class TableSourceTest extends TableTestBase { | name varchar(32), | watermark for rowtime as rowtime |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'bounded' = 'false' |) """.stripMargin @@ -57,7 +57,7 @@ class TableSourceTest extends TableTestBase { | name varchar(32), | watermark for rowtime as rowtime |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'bounded' = 'false' |) """.stripMargin @@ -86,7 +86,7 @@ class TableSourceTest extends TableTestBase { | pTime as PROCTIME(), | watermark for pTime as pTime |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'bounded' = 'false' |) """.stripMargin @@ -107,7 +107,7 @@ class TableSourceTest extends TableTestBase { | ptime as PROCTIME(), | watermark for ptime as ptime |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'bounded' = 'false' |) """.stripMargin @@ -128,7 +128,7 @@ class TableSourceTest extends TableTestBase { | ptime as PROCTIME(), | watermark for rtime as rtime |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'bounded' = 'false' |) """.stripMargin @@ -148,7 +148,7 @@ class TableSourceTest extends TableTestBase { | ptime as PROCTIME(), | watermark for rtime as rtime |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'bounded' = 'false' |) """.stripMargin @@ -168,7 +168,7 @@ class TableSourceTest extends TableTestBase { | ptime as PROCTIME(), | watermark for ptime as ptime |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'bounded' = 'false' |) """.stripMargin @@ -188,7 +188,7 @@ class TableSourceTest extends TableTestBase { | ptime as PROCTIME(), | watermark for rtime as rtime |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'bounded' = 'false' |) """.stripMargin @@ -208,7 +208,7 @@ class TableSourceTest extends TableTestBase { | nested row<name string, `value` int>, | name string |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'nested-projection-supported' = 'false', | 'bounded' = 'false' |) @@ -235,7 +235,7 @@ class TableSourceTest extends TableTestBase { | id int, | name varchar(32) |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'bounded' = 'false' |) """.stripMargin diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala index 9d3ce05..0f0ead9 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala @@ -39,7 +39,7 @@ class TableSourceTest extends TableTestBase { | name varchar(32), | watermark for rowtime as rowtime |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'bounded' = 'false' |) """.stripMargin @@ -61,7 +61,7 @@ class TableSourceTest extends TableTestBase { | name varchar(32), | watermark for rowtime as rowtime |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'bounded' = 'false' |) """.stripMargin @@ -86,7 +86,7 @@ class TableSourceTest extends TableTestBase { | proctime as PROCTIME(), | watermark for proctime as proctime |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'bounded' = 'false' |) """.stripMargin @@ -107,7 +107,7 @@ class TableSourceTest extends TableTestBase { | name varchar(32), | proctime as PROCTIME() |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'bounded' = 'false' |) """.stripMargin @@ -132,7 +132,7 @@ class TableSourceTest extends TableTestBase { | ptime as PROCTIME(), | watermark for ptime as ptime |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'bounded' = 'false' |) """.stripMargin @@ -154,7 +154,7 @@ class TableSourceTest extends TableTestBase { | ptime as PROCTIME(), | watermark for rtime as rtime |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'bounded' = 'false' |) """.stripMargin @@ -175,7 +175,7 @@ class TableSourceTest extends TableTestBase { | ptime as PROCTIME(), | watermark for rtime as rtime |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'bounded' = 'false' |) """.stripMargin @@ -196,7 +196,7 @@ class TableSourceTest extends TableTestBase { | ptime as PROCTIME(), | watermark for ptime as ptime |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'bounded' = 'false' |) """.stripMargin @@ -217,7 +217,7 @@ class TableSourceTest extends TableTestBase { | ptime as PROCTIME(), | watermark for rtime as rtime |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'bounded' = 'false' |) """.stripMargin @@ -238,7 +238,7 @@ class TableSourceTest extends TableTestBase { | nested row<name string, `value` int>, | name string |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'nested-projection-supported' = 'false', | 'bounded' = 'false' |) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala index dda3bbe..2d799d1 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala @@ -31,7 +31,7 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.data.{DecimalDataUtils, TimestampData} import org.apache.flink.table.data.util.DataFormatConverters.LocalDateConverter import org.apache.flink.table.planner.expressions.utils.{RichFunc1, RichFunc2, RichFunc3, SplitUDF} -import org.apache.flink.table.planner.factories.TestProjectableValuesTableFactory +import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecSortRule import org.apache.flink.table.planner.runtime.utils.BatchTableEnvUtil.parseFieldNames import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row @@ -42,7 +42,6 @@ import org.apache.flink.table.planner.utils.DateTimeTestUtil import org.apache.flink.table.planner.utils.DateTimeTestUtil._ import org.apache.flink.table.runtime.functions.SqlDateTimeUtils.unixTimestampToLocalDateTime import org.apache.flink.types.Row - import org.junit.Assert.assertEquals import org.junit._ @@ -1250,7 +1249,7 @@ class CalcITCase extends BatchTestBase { @Test def testSimpleProject(): Unit = { - val myTableDataId = TestProjectableValuesTableFactory.registerData(TestData.smallData3) + val myTableDataId = TestValuesTableFactory.registerData(TestData.smallData3) val ddl = s""" |CREATE TABLE SimpleTable ( @@ -1258,7 +1257,7 @@ class CalcITCase extends BatchTestBase { | b bigint, | c string |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'data-id' = '$myTableDataId', | 'bounded' = 'true' |) @@ -1278,7 +1277,7 @@ class CalcITCase extends BatchTestBase { row(2, row(row("HELLO", 22), row(222, false)), row("hello", 2222), "mary"), row(3, row(row("HELLO WORLD", 33), row(333, true)), row("hello world", 3333), "benji") ) - val myTableDataId = TestProjectableValuesTableFactory.registerData(data) + val myTableDataId = TestValuesTableFactory.registerData(data) val ddl = s""" |CREATE TABLE NestedTable ( @@ -1288,7 +1287,7 @@ class CalcITCase extends BatchTestBase { | nested row<name string, `value` int>, | name string |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'nested-projection-supported' = 'false', | 'data-id' = '$myTableDataId', | 'bounded' = 'true' diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala index 7cc6e66..d32c9ea 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala @@ -25,7 +25,7 @@ import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.table.api.internal.TableEnvironmentInternal import org.apache.flink.table.api.scala._ import org.apache.flink.table.data.{GenericRowData, RowData} -import org.apache.flink.table.planner.factories.TestProjectableValuesTableFactory +import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils._ import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo @@ -290,7 +290,7 @@ class CalcITCase extends StreamingTestBase { @Test def testSimpleProject(): Unit = { - val myTableDataId = TestProjectableValuesTableFactory.registerData(TestData.smallData3) + val myTableDataId = TestValuesTableFactory.registerData(TestData.smallData3) val ddl = s""" |CREATE TABLE SimpleTable ( @@ -298,7 +298,7 @@ class CalcITCase extends StreamingTestBase { | b bigint, | c string |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'data-id' = '$myTableDataId', | 'bounded' = 'true' |) @@ -321,7 +321,7 @@ class CalcITCase extends StreamingTestBase { row(2, row(row("HELLO", 22), row(222, false)), row("hello", 2222), "mary"), row(3, row(row("HELLO WORLD", 33), row(333, true)), row("hello world", 3333), "benji") ) - val myTableDataId = TestProjectableValuesTableFactory.registerData(data) + val myTableDataId = TestValuesTableFactory.registerData(data) val ddl = s""" |CREATE TABLE NestedTable ( @@ -331,7 +331,7 @@ class CalcITCase extends StreamingTestBase { | nested row<name string, `value` int>, | name string |) WITH ( - | 'connector' = 'projectable-values', + | 'connector' = 'values', | 'nested-projection-supported' = 'false', | 'data-id' = '$myTableDataId', | 'bounded' = 'true' diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala index 5b59ffb..403a240 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala @@ -32,7 +32,7 @@ import org.apache.flink.table.data.binary.BinaryRowData import org.apache.flink.table.data.writer.BinaryRowWriter import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction} import org.apache.flink.table.planner.delegation.PlannerBase -import org.apache.flink.table.planner.factories.{TestProjectableValuesTableFactory, TestValuesTableFactory} +import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.plan.stats.FlinkStatistic import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.DEFAULT_PARALLELISM @@ -81,7 +81,6 @@ class BatchTestBase extends BatchAbstractTestBase { @After def after(): Unit = { TestValuesTableFactory.clearAllData() - TestProjectableValuesTableFactory.clearAllRegisteredData() } /** diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala index 3b6bbe4..3fffbc0 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala @@ -22,8 +22,8 @@ import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.scala.StreamTableEnvironment -import org.apache.flink.table.api.{EnvironmentSettings, ImplicitExpressionConversions} -import org.apache.flink.table.planner.factories.{TestProjectableValuesTableFactory, TestValuesTableFactory} +import org.apache.flink.table.api.ImplicitExpressionConversions +import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.api.{EnvironmentSettings, Table} import org.apache.flink.test.util.AbstractTestBase import org.apache.flink.types.Row @@ -62,7 +62,6 @@ class StreamingTestBase extends AbstractTestBase { def after(): Unit = { StreamTestSink.clear() TestValuesTableFactory.clearAllData() - TestProjectableValuesTableFactory.clearAllRegisteredData() } /**
