This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 71e64989ee0b7109271ced5e1af03cc3694783e9
Author: Jark Wu <[email protected]>
AuthorDate: Mon May 18 17:40:17 2020 +0800

    [hotfix][table] Improve testing implementation for the new projection push 
down
---
 .../apache/flink/table/utils/TableSchemaUtils.java |  20 ++
 .../flink/table/utils/TableSchemaUtilsTest.java    |  48 +++
 .../TestProjectableValuesTableFactory.java         | 326 ---------------------
 .../planner/factories/TestValuesTableFactory.java  |  79 ++++-
 .../PushProjectIntoTableSourceScanRuleTest.java    |   6 +-
 .../org.apache.flink.table.factories.Factory       |   1 -
 .../planner/plan/stream/sql/TableScanTest.xml      |   2 +-
 .../planner/plan/batch/sql/TableSourceTest.scala   |   4 +-
 .../planner/plan/stream/sql/TableSourceTest.scala  |  20 +-
 .../plan/stream/table/TableSourceTest.scala        |  20 +-
 .../planner/runtime/batch/sql/CalcITCase.scala     |  11 +-
 .../planner/runtime/stream/sql/CalcITCase.scala    |  10 +-
 .../planner/runtime/utils/BatchTestBase.scala      |   3 +-
 .../planner/runtime/utils/StreamingTestBase.scala  |   5 +-
 14 files changed, 170 insertions(+), 385 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
index 4863cb2..67ee125 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
@@ -32,6 +32,8 @@ import org.apache.flink.util.Preconditions;
 import java.util.List;
 import java.util.Optional;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * Utilities to {@link TableSchema}.
  */
@@ -62,6 +64,24 @@ public class TableSchemaUtils {
        }
 
        /**
+        * Creates a new {@link TableSchema} with the projected fields from 
another {@link TableSchema}.
+        * The new {@link TableSchema} doesn't contain any primary key or 
watermark information.
+        *
+        * @see 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown
+        */
+       public static TableSchema projectSchema(TableSchema tableSchema, 
int[][] projectedFields) {
+               checkArgument(!containsGeneratedColumns(tableSchema), "It's 
illegal to project on a schema contains computed columns.");
+               TableSchema.Builder schemaBuilder = TableSchema.builder();
+               List<TableColumn> tableColumns = tableSchema.getTableColumns();
+               for (int[] fieldPath : projectedFields) {
+                       checkArgument(fieldPath.length == 1, "Nested projection 
push down is not supported yet.");
+                       TableColumn column = tableColumns.get(fieldPath[0]);
+                       schemaBuilder.field(column.getName(), column.getType());
+               }
+               return schemaBuilder.build();
+       }
+
+       /**
         * Returns true if there are any generated columns in the given {@link 
TableColumn}.
         */
        public static boolean containsGeneratedColumns(TableSchema schema) {
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java
index e96ddd9..3e760b1 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java
@@ -70,4 +70,52 @@ public class TableSchemaUtilsTest {
                exceptionRule.expectMessage("Constraint ct2 to drop does not 
exist");
                TableSchemaUtils.dropConstraint(oriSchema, "ct2");
        }
+
+       @Test
+       public void testInvalidProjectSchema() {
+               {
+                       TableSchema schema = TableSchema.builder()
+                               .field("a", DataTypes.INT().notNull())
+                               .field("b", DataTypes.STRING())
+                               .field("c", DataTypes.INT(), "a + 1")
+                               .field("t", DataTypes.TIMESTAMP(3))
+                               .primaryKey("ct1", new String[]{"a"})
+                               .watermark("t", "t", DataTypes.TIMESTAMP(3))
+                               .build();
+                       exceptionRule.expect(IllegalArgumentException.class);
+                       exceptionRule.expectMessage("It's illegal to project on 
a schema contains computed columns.");
+                       int[][] projectedFields = {{1}};
+                       TableSchemaUtils.projectSchema(schema, projectedFields);
+               }
+
+               {
+                       TableSchema schema = TableSchema.builder()
+                               .field("a", DataTypes.ROW(DataTypes.FIELD("f0", 
DataTypes.STRING())))
+                               .field("b", DataTypes.STRING())
+                               .build();
+                       exceptionRule.expect(IllegalArgumentException.class);
+                       exceptionRule.expectMessage("Nested projection push 
down is not supported yet.");
+                       int[][] projectedFields = {{0, 1}};
+                       TableSchemaUtils.projectSchema(schema, projectedFields);
+               }
+       }
+
+       @Test
+       public void testProjectSchema() {
+               TableSchema schema = TableSchema.builder()
+                       .field("a", DataTypes.INT().notNull())
+                       .field("b", DataTypes.STRING())
+                       .field("t", DataTypes.TIMESTAMP(3))
+                       .primaryKey("a")
+                       .watermark("t", "t", DataTypes.TIMESTAMP(3))
+                       .build();
+
+               int[][] projectedFields = {{2}, {0}};
+               TableSchema projected = TableSchemaUtils.projectSchema(schema, 
projectedFields);
+               TableSchema expected = TableSchema.builder()
+                       .field("t", DataTypes.TIMESTAMP(3))
+                       .field("a", DataTypes.INT().notNull())
+                       .build();
+               assertEquals(expected, projected);
+       }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestProjectableValuesTableFactory.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestProjectableValuesTableFactory.java
deleted file mode 100644
index c5367ea..0000000
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestProjectableValuesTableFactory.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.factories;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.io.CollectionInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.connector.source.InputFormatProvider;
-import org.apache.flink.table.connector.source.ScanTableSource;
-import org.apache.flink.table.connector.source.SourceFunctionProvider;
-import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.DynamicTableSourceFactory;
-import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.FieldsDataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-import org.apache.flink.types.RowKind;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import scala.collection.Seq;
-
-/**
- * Test implementation of {@link DynamicTableSourceFactory} that supports 
projection push down.
- */
-public class TestProjectableValuesTableFactory implements 
DynamicTableSourceFactory {
-
-       // 
--------------------------------------------------------------------------------------------
-       // Data Registration
-       // 
--------------------------------------------------------------------------------------------
-
-       private static final AtomicInteger idCounter = new AtomicInteger(0);
-       private static final Map<String, Collection<Tuple2<RowKind, Row>>> 
registeredData = new HashMap<>();
-
-       /**
-        * Register the given data into the data factory context and return the 
data id.
-        * The data id can be used as a reference to the registered data in 
data connector DDL.
-        */
-       public static String registerData(Collection<Row> data) {
-               List<Tuple2<RowKind, Row>> dataWithKinds = new ArrayList<>();
-               for (Row row : data) {
-                       dataWithKinds.add(Tuple2.of(RowKind.INSERT, row));
-               }
-               return registerChangelogData(dataWithKinds);
-       }
-
-       /**
-        * Register the given data into the data factory context and return the 
data id.
-        * The data id can be used as a reference to the registered data in 
data connector DDL.
-        */
-       public static String registerData(Seq<Row> data) {
-               return registerData(JavaScalaConversionUtil.toJava(data));
-       }
-
-       /**
-        * Register the given data with RowKind into the data factory context 
and return the data id.
-        * The data id can be used as a reference to the registered data in 
data connector DDL.
-        * TODO: remove this utility once Row supports RowKind.
-        */
-       public static String registerChangelogData(Collection<Tuple2<RowKind, 
Row>> data) {
-               String id = String.valueOf(idCounter.incrementAndGet());
-               registeredData.put(id, data);
-               return id;
-       }
-
-       /**
-        * Removes the registered data under the given data id.
-        */
-       public static void clearAllRegisteredData() {
-               registeredData.clear();
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // Factory
-       // 
--------------------------------------------------------------------------------------------
-
-       private static final String IDENTIFIER = "projectable-values";
-
-       private static final ConfigOption<String> DATA_ID = ConfigOptions
-                       .key("data-id")
-                       .stringType()
-                       .defaultValue(null);
-
-       private static final ConfigOption<Boolean> BOUNDED = ConfigOptions
-                       .key("bounded")
-                       .booleanType()
-                       .defaultValue(false);
-
-       private static final ConfigOption<String> CHANGELOG_MODE = ConfigOptions
-                       .key("changelog-mode")
-                       .stringType()
-                       .defaultValue("I"); // all available "I,UA,UB,D"
-
-       private static final ConfigOption<String> RUNTIME_SOURCE = ConfigOptions
-                       .key("runtime-source")
-                       .stringType()
-                       .defaultValue("SourceFunction"); // another is 
"InputFormat"
-
-       private static final ConfigOption<Boolean> NESTED_PROJECTION_SUPPORTED 
= ConfigOptions
-                       .key("nested-projection-supported")
-                       .booleanType()
-                       .defaultValue(false);
-
-       @Override
-       public String factoryIdentifier() {
-               return IDENTIFIER;
-       }
-
-       @Override
-       public DynamicTableSource createDynamicTableSource(Context context) {
-               FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
-               helper.validate();
-               ChangelogMode changelogMode = 
parseChangelogMode(helper.getOptions().get(CHANGELOG_MODE));
-               String runtimeSource = helper.getOptions().get(RUNTIME_SOURCE);
-               boolean isBounded = helper.getOptions().get(BOUNDED);
-               String dataId = helper.getOptions().get(DATA_ID);
-               boolean nestedProjectionSupported = 
helper.getOptions().get(NESTED_PROJECTION_SUPPORTED);
-
-               Collection<Tuple2<RowKind, Row>> data = 
registeredData.getOrDefault(dataId, Collections.emptyList());
-               DataType rowDataType = 
context.getCatalogTable().getSchema().toPhysicalRowDataType();
-               return new TestProjectableValuesTableSource(
-                               changelogMode,
-                               isBounded,
-                               runtimeSource,
-                               rowDataType,
-                               data,
-                               nestedProjectionSupported);
-       }
-
-       @Override
-       public Set<ConfigOption<?>> requiredOptions() {
-               return Collections.emptySet();
-       }
-
-       @Override
-       public Set<ConfigOption<?>> optionalOptions() {
-               return new HashSet<>(Arrays.asList(
-                               DATA_ID,
-                               CHANGELOG_MODE,
-                               BOUNDED,
-                               RUNTIME_SOURCE,
-                               NESTED_PROJECTION_SUPPORTED));
-       }
-
-       private ChangelogMode parseChangelogMode(String string) {
-               ChangelogMode.Builder builder = ChangelogMode.newBuilder();
-               for (String split : string.split(",")) {
-                       switch (split.trim()) {
-                               case "I":
-                                       
builder.addContainedKind(RowKind.INSERT);
-                                       break;
-                               case "UB":
-                                       
builder.addContainedKind(RowKind.UPDATE_BEFORE);
-                                       break;
-                               case "UA":
-                                       
builder.addContainedKind(RowKind.UPDATE_AFTER);
-                                       break;
-                               case "D":
-                                       
builder.addContainedKind(RowKind.DELETE);
-                                       break;
-                               default:
-                                       throw new 
IllegalArgumentException("Invalid ChangelogMode string: " + string);
-                       }
-               }
-               return builder.build();
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // Table source
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * Values {@link DynamicTableSource} for testing.
-        */
-       private static class TestProjectableValuesTableSource implements 
ScanTableSource, SupportsProjectionPushDown {
-
-               private final ChangelogMode changelogMode;
-               private final boolean bounded;
-               private final String runtimeSource;
-               private DataType physicalRowDataType;
-               private final Collection<Tuple2<RowKind, Row>> data;
-               private final boolean nestedProjectionSupported;
-               private int[] projectedFields = null;
-
-               private TestProjectableValuesTableSource(
-                               ChangelogMode changelogMode,
-                               boolean bounded, String runtimeSource,
-                               DataType physicalRowDataType,
-                               Collection<Tuple2<RowKind, Row>> data,
-                               boolean nestedProjectionSupported) {
-                       this.changelogMode = changelogMode;
-                       this.bounded = bounded;
-                       this.runtimeSource = runtimeSource;
-                       this.physicalRowDataType = physicalRowDataType;
-                       this.data = data;
-                       this.nestedProjectionSupported = 
nestedProjectionSupported;
-               }
-
-               @Override
-               public ChangelogMode getChangelogMode() {
-                       return changelogMode;
-               }
-
-               @SuppressWarnings("unchecked")
-               @Override
-               public ScanRuntimeProvider 
getScanRuntimeProvider(ScanTableSource.Context runtimeProviderContext) {
-                       TypeSerializer<RowData> serializer = 
(TypeSerializer<RowData>) runtimeProviderContext
-                                       
.createTypeInformation(physicalRowDataType)
-                                       .createSerializer(new 
ExecutionConfig());
-                       DataStructureConverter converter = 
runtimeProviderContext.createDataStructureConverter(physicalRowDataType);
-                       Collection<RowData> values = convertToRowData(data, 
projectedFields, converter);
-
-                       if (runtimeSource.equals("SourceFunction")) {
-                               try {
-                                       return SourceFunctionProvider.of(
-                                                       new 
FromElementsFunction<>(serializer, values),
-                                                       bounded);
-                               } catch (IOException e) {
-                                       throw new RuntimeException(e);
-                               }
-                       } else if (runtimeSource.equals("InputFormat")) {
-                               return InputFormatProvider.of(new 
CollectionInputFormat<>(values, serializer));
-                       } else {
-                               throw new IllegalArgumentException("Unsupported 
runtime source class: " + runtimeSource);
-                       }
-               }
-
-               @Override
-               public DynamicTableSource copy() {
-                       TestProjectableValuesTableSource newTableSource = new 
TestProjectableValuesTableSource(
-                                       changelogMode, bounded, runtimeSource, 
physicalRowDataType, data, nestedProjectionSupported);
-                       newTableSource.projectedFields = projectedFields;
-                       return newTableSource;
-               }
-
-               @Override
-               public String asSummaryString() {
-                       return "TestProjectableValues";
-               }
-
-               private static Collection<RowData> convertToRowData(
-                               Collection<Tuple2<RowKind, Row>> data,
-                               @Nullable int[] projectedFields,
-                               DataStructureConverter converter) {
-                       List<RowData> result = new ArrayList<>();
-                       for (Tuple2<RowKind, Row> value : data) {
-                               Row projectedRow;
-                               if (projectedFields == null) {
-                                       projectedRow = value.f1;
-                               } else {
-                                       Object[] newValues = new 
Object[projectedFields.length];
-                                       for (int i = 0; i < 
projectedFields.length; ++i) {
-                                               newValues[i] = 
value.f1.getField(projectedFields[i]);
-                                       }
-                                       projectedRow = Row.of(newValues);
-                               }
-                               RowData rowData = (RowData) 
converter.toInternal(projectedRow);
-                               if (rowData != null) {
-                                       rowData.setRowKind(value.f0);
-                                       result.add(rowData);
-                               }
-                       }
-                       return result;
-               }
-
-               @Override
-               public boolean supportsNestedProjection() {
-                       return nestedProjectionSupported;
-               }
-
-               @Override
-               public void applyProjection(int[][] projectedFields) {
-                       this.projectedFields = new int[projectedFields.length];
-                       FieldsDataType dataType = (FieldsDataType) 
physicalRowDataType;
-                       RowType rowType = ((RowType) 
physicalRowDataType.getLogicalType());
-                       DataTypes.Field[] fields = new 
DataTypes.Field[projectedFields.length];
-                       for (int i = 0; i < projectedFields.length; ++i) {
-                               int[] projection = projectedFields[i];
-                               Preconditions.checkArgument(projection.length 
== 1);
-                               int index = projection[0];
-                               this.projectedFields[i] = index;
-                               fields[i] = 
DataTypes.FIELD(rowType.getFieldNames().get(index), 
dataType.getChildren().get(index));
-                       }
-                       this.physicalRowDataType = DataTypes.ROW(fields);
-               }
-       }
-}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index e889814..47dcf42 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.SourceFunctionProvider;
 import org.apache.flink.table.connector.source.TableFunctionProvider;
 import 
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
@@ -55,7 +56,6 @@ import 
org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.Retra
 import 
org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.TestValuesLookupFunction;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
-import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.TableSchemaUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
@@ -235,6 +235,11 @@ public final class TestValuesTableFactory implements 
DynamicTableSourceFactory,
                .booleanType()
                .defaultValue(true);
 
+       private static final ConfigOption<Boolean> NESTED_PROJECTION_SUPPORTED 
= ConfigOptions
+               .key("nested-projection-supported")
+               .booleanType()
+               .defaultValue(false);
+
        @Override
        public String factoryIdentifier() {
                return IDENTIFIER;
@@ -251,18 +256,21 @@ public final class TestValuesTableFactory implements 
DynamicTableSourceFactory,
                String sourceClass = 
helper.getOptions().get(TABLE_SOURCE_CLASS);
                boolean isAsync = helper.getOptions().get(ASYNC_ENABLED);
                String lookupFunctionClass = 
helper.getOptions().get(LOOKUP_FUNCTION_CLASS);
+               boolean nestedProjectionSupported = 
helper.getOptions().get(NESTED_PROJECTION_SUPPORTED);
 
                if (sourceClass.equals("DEFAULT")) {
                        Collection<Tuple2<RowKind, Row>> data = 
registeredData.getOrDefault(dataId, Collections.emptyList());
-                       DataType rowDataType = 
context.getCatalogTable().getSchema().toPhysicalRowDataType();
+                       TableSchema physicalSchema = 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
                        return new TestValuesTableSource(
+                               physicalSchema,
                                changelogMode,
                                isBounded,
                                runtimeSource,
-                               rowDataType,
                                data,
                                isAsync,
-                               lookupFunctionClass);
+                               lookupFunctionClass,
+                               nestedProjectionSupported,
+                               null);
                } else {
                        try {
                                return InstantiationUtil.instantiate(
@@ -306,7 +314,8 @@ public final class TestValuesTableFactory implements 
DynamicTableSourceFactory,
                        ASYNC_ENABLED,
                        TABLE_SOURCE_CLASS,
                        SINK_INSERT_ONLY,
-                       RUNTIME_SINK));
+                       RUNTIME_SINK,
+                       NESTED_PROJECTION_SUPPORTED));
        }
 
        private ChangelogMode parseChangelogMode(String string) {
@@ -339,30 +348,37 @@ public final class TestValuesTableFactory implements 
DynamicTableSourceFactory,
        /**
         * Values {@link DynamicTableSource} for testing.
         */
-       private static class TestValuesTableSource implements ScanTableSource, 
LookupTableSource {
+       private static class TestValuesTableSource implements ScanTableSource, 
LookupTableSource, SupportsProjectionPushDown {
 
+               private TableSchema physicalSchema;
                private final ChangelogMode changelogMode;
                private final boolean bounded;
                private final String runtimeSource;
-               private final DataType physicalRowDataType;
                private final Collection<Tuple2<RowKind, Row>> data;
                private final boolean isAsync;
                private final @Nullable String lookupFunctionClass;
+               private final boolean nestedProjectionSupported;
+               private @Nullable int[] projectedFields;
 
                private TestValuesTableSource(
+                               TableSchema physicalSchema,
                                ChangelogMode changelogMode,
-                               boolean bounded, String runtimeSource,
-                               DataType physicalRowDataType,
+                               boolean bounded,
+                               String runtimeSource,
                                Collection<Tuple2<RowKind, Row>> data,
                                boolean isAsync,
-                               @Nullable String lookupFunctionClass) {
+                               @Nullable String lookupFunctionClass,
+                               boolean nestedProjectionSupported,
+                               int[] projectedFields) {
+                       this.physicalSchema = physicalSchema;
                        this.changelogMode = changelogMode;
                        this.bounded = bounded;
                        this.runtimeSource = runtimeSource;
-                       this.physicalRowDataType = physicalRowDataType;
                        this.data = data;
                        this.isAsync = isAsync;
                        this.lookupFunctionClass = lookupFunctionClass;
+                       this.nestedProjectionSupported = 
nestedProjectionSupported;
+                       this.projectedFields = projectedFields;
                }
 
                @Override
@@ -374,11 +390,11 @@ public final class TestValuesTableFactory implements 
DynamicTableSourceFactory,
                @Override
                public ScanRuntimeProvider 
getScanRuntimeProvider(ScanTableSource.Context runtimeProviderContext) {
                        TypeSerializer<RowData> serializer = 
(TypeSerializer<RowData>) runtimeProviderContext
-                               .createTypeInformation(physicalRowDataType)
+                               
.createTypeInformation(physicalSchema.toRowDataType())
                                .createSerializer(new ExecutionConfig());
-                       DataStructureConverter converter = 
runtimeProviderContext.createDataStructureConverter(physicalRowDataType);
+                       DataStructureConverter converter = 
runtimeProviderContext.createDataStructureConverter(physicalSchema.toRowDataType());
                        
converter.open(RuntimeConverter.Context.create(TestValuesTableFactory.class.getClassLoader()));
-                       Collection<RowData> values = convertToRowData(data, 
converter);
+                       Collection<RowData> values = convertToRowData(data, 
projectedFields, converter);
 
                        if (runtimeSource.equals("SourceFunction")) {
                                try {
@@ -438,8 +454,28 @@ public final class TestValuesTableFactory implements 
DynamicTableSourceFactory,
                }
 
                @Override
+               public boolean supportsNestedProjection() {
+                       return nestedProjectionSupported;
+               }
+
+               @Override
+               public void applyProjection(int[][] projectedFields) {
+                       this.physicalSchema = 
TableSchemaUtils.projectSchema(physicalSchema, projectedFields);
+                       this.projectedFields = 
Arrays.stream(projectedFields).mapToInt(f -> f[0]).toArray();
+               }
+
+               @Override
                public DynamicTableSource copy() {
-                       return new TestValuesTableSource(changelogMode, 
bounded, runtimeSource, physicalRowDataType, data, isAsync, 
lookupFunctionClass);
+                       return new TestValuesTableSource(
+                               physicalSchema,
+                               changelogMode,
+                               bounded,
+                               runtimeSource,
+                               data,
+                               isAsync,
+                               lookupFunctionClass,
+                               nestedProjectionSupported,
+                               projectedFields);
                }
 
                @Override
@@ -449,10 +485,21 @@ public final class TestValuesTableFactory implements 
DynamicTableSourceFactory,
 
                private static Collection<RowData> convertToRowData(
                                Collection<Tuple2<RowKind, Row>> data,
+                               int[] projectedFields,
                                DataStructureConverter converter) {
                        List<RowData> result = new ArrayList<>();
                        for (Tuple2<RowKind, Row> value : data) {
-                               RowData rowData = (RowData) 
converter.toInternal(value.f1);
+                               Row projectedRow;
+                               if (projectedFields == null) {
+                                       projectedRow = value.f1;
+                               } else {
+                                       Object[] newValues = new 
Object[projectedFields.length];
+                                       for (int i = 0; i < 
projectedFields.length; ++i) {
+                                               newValues[i] = 
value.f1.getField(projectedFields[i]);
+                                       }
+                                       projectedRow = Row.of(newValues);
+                               }
+                               RowData rowData = (RowData) 
converter.toInternal(projectedRow);
                                if (rowData != null) {
                                        rowData.setRowKind(value.f0);
                                        result.add(rowData);
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
index 955e3d4..b8a5b0b 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
@@ -54,7 +54,7 @@ public class PushProjectIntoTableSourceScanRuleTest extends 
PushProjectIntoLegac
                                                "  b bigint,\n" +
                                                "  c string\n" +
                                                ") WITH (\n" +
-                                               " 'connector' = 
'projectable-values',\n" +
+                                               " 'connector' = 'values',\n" +
                                                " 'bounded' = 'true'\n" +
                                                ")";
                util().tableEnv().executeSql(ddl1);
@@ -66,7 +66,7 @@ public class PushProjectIntoTableSourceScanRuleTest extends 
PushProjectIntoLegac
                                                "  c string,\n" +
                                                "  d as a + 1\n" +
                                                ") WITH (\n" +
-                                               " 'connector' = 
'projectable-values',\n" +
+                                               " 'connector' = 'values',\n" +
                                                " 'bounded' = 'true'\n" +
                                                ")";
                util().tableEnv().executeSql(ddl2);
@@ -92,7 +92,7 @@ public class PushProjectIntoTableSourceScanRuleTest extends 
PushProjectIntoLegac
                                                "  nested row<name string, 
`value` int>,\n" +
                                                "  name string\n" +
                                                ") WITH (\n" +
-                                               " 'connector' = 
'projectable-values',\n" +
+                                               " 'connector' = 'values',\n" +
                                                " 'nested-projection-supported' 
= '" + nestedProjectionSupported + "',\n" +
                                                "  'bounded' = 'true'\n" +
                                                ")";
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 7632f4b..498fb98 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -14,5 +14,4 @@
 # limitations under the License.
 
 org.apache.flink.table.planner.factories.TestValuesTableFactory
-org.apache.flink.table.planner.factories.TestProjectableValuesTableFactory
 org.apache.flink.table.planner.utils.TestCsvFileSystemFormatFactory
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
index 9c08f74..5e8c3fe 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
@@ -55,7 +55,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
 GroupAggregate(select=[COUNT_RETRACT(*) AS EXPR$0], changelogMode=[I,UA,D])
 +- Exchange(distribution=[single], changelogMode=[I,UB,UA])
    +- Calc(select=[0 AS $f0], where=[>(a, 1)], changelogMode=[I,UB,UA])
-      +- TableSourceScan(table=[[default_catalog, default_database, src]], 
fields=[ts, a, b], changelogMode=[I,UB,UA])
+      +- TableSourceScan(table=[[default_catalog, default_database, src, 
project=[a]]], fields=[a], changelogMode=[I,UB,UA])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
index 7b12fcf..9699a05 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
@@ -35,7 +35,7 @@ class TableSourceTest extends TableTestBase {
          |  b bigint,
          |  c varchar(32)
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'true'
          |)
        """.stripMargin
@@ -49,7 +49,7 @@ class TableSourceTest extends TableTestBase {
         |  nested row<name string, `value` int>,
         |  name string
         |) WITH (
-        | 'connector' = 'projectable-values',
+        | 'connector' = 'values',
         |  'bounded' = 'true'
         |)
         |""".stripMargin
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
index 5f15680..b5f252b 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
@@ -37,7 +37,7 @@ class TableSourceTest extends TableTestBase {
          |  name varchar(32),
          |  watermark for rowtime as rowtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -57,7 +57,7 @@ class TableSourceTest extends TableTestBase {
          |  name varchar(32),
          |  watermark for rowtime as rowtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -86,7 +86,7 @@ class TableSourceTest extends TableTestBase {
          |  pTime as PROCTIME(),
          |  watermark for pTime as pTime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -107,7 +107,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for ptime as ptime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -128,7 +128,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for rtime as rtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -148,7 +148,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for rtime as rtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -168,7 +168,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for ptime as ptime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -188,7 +188,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for rtime as rtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -208,7 +208,7 @@ class TableSourceTest extends TableTestBase {
          |  nested row<name string, `value` int>,
          |  name string
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'nested-projection-supported' = 'false',
          |  'bounded' = 'false'
          |)
@@ -235,7 +235,7 @@ class TableSourceTest extends TableTestBase {
          |  id int,
          |  name varchar(32)
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala
index 9d3ce05..0f0ead9 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala
@@ -39,7 +39,7 @@ class TableSourceTest extends TableTestBase {
          |  name varchar(32),
          |  watermark for rowtime as rowtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -61,7 +61,7 @@ class TableSourceTest extends TableTestBase {
          |  name varchar(32),
          |  watermark for rowtime as rowtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -86,7 +86,7 @@ class TableSourceTest extends TableTestBase {
          |  proctime as PROCTIME(),
          |  watermark for proctime as proctime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -107,7 +107,7 @@ class TableSourceTest extends TableTestBase {
          |  name varchar(32),
          |  proctime as PROCTIME()
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -132,7 +132,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for ptime as ptime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -154,7 +154,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for rtime as rtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -175,7 +175,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for rtime as rtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -196,7 +196,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for ptime as ptime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -217,7 +217,7 @@ class TableSourceTest extends TableTestBase {
          |  ptime as PROCTIME(),
          |  watermark for rtime as rtime
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'bounded' = 'false'
          |)
        """.stripMargin
@@ -238,7 +238,7 @@ class TableSourceTest extends TableTestBase {
          |  nested row<name string, `value` int>,
          |  name string
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'nested-projection-supported' = 'false',
          |  'bounded' = 'false'
          |)
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
index dda3bbe..2d799d1 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
@@ -31,7 +31,7 @@ import 
org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.data.{DecimalDataUtils, TimestampData}
 import org.apache.flink.table.data.util.DataFormatConverters.LocalDateConverter
 import org.apache.flink.table.planner.expressions.utils.{RichFunc1, RichFunc2, 
RichFunc3, SplitUDF}
-import 
org.apache.flink.table.planner.factories.TestProjectableValuesTableFactory
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import 
org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecSortRule
 import 
org.apache.flink.table.planner.runtime.utils.BatchTableEnvUtil.parseFieldNames
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
@@ -42,7 +42,6 @@ import org.apache.flink.table.planner.utils.DateTimeTestUtil
 import org.apache.flink.table.planner.utils.DateTimeTestUtil._
 import 
org.apache.flink.table.runtime.functions.SqlDateTimeUtils.unixTimestampToLocalDateTime
 import org.apache.flink.types.Row
-
 import org.junit.Assert.assertEquals
 import org.junit._
 
@@ -1250,7 +1249,7 @@ class CalcITCase extends BatchTestBase {
 
   @Test
   def testSimpleProject(): Unit = {
-    val myTableDataId = 
TestProjectableValuesTableFactory.registerData(TestData.smallData3)
+    val myTableDataId = 
TestValuesTableFactory.registerData(TestData.smallData3)
     val ddl =
       s"""
          |CREATE TABLE SimpleTable (
@@ -1258,7 +1257,7 @@ class CalcITCase extends BatchTestBase {
          |  b bigint,
          |  c string
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'data-id' = '$myTableDataId',
          |  'bounded' = 'true'
          |)
@@ -1278,7 +1277,7 @@ class CalcITCase extends BatchTestBase {
       row(2, row(row("HELLO", 22), row(222, false)), row("hello", 2222), 
"mary"),
       row(3, row(row("HELLO WORLD", 33), row(333, true)), row("hello world", 
3333), "benji")
     )
-    val myTableDataId = TestProjectableValuesTableFactory.registerData(data)
+    val myTableDataId = TestValuesTableFactory.registerData(data)
     val ddl =
       s"""
          |CREATE TABLE NestedTable (
@@ -1288,7 +1287,7 @@ class CalcITCase extends BatchTestBase {
          |  nested row<name string, `value` int>,
          |  name string
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'nested-projection-supported' = 'false',
          |  'data-id' = '$myTableDataId',
          |  'bounded' = 'true'
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
index 7cc6e66..d32c9ea 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.api.scala.typeutils.Types
 import org.apache.flink.table.api.internal.TableEnvironmentInternal
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.data.{GenericRowData, RowData}
-import 
org.apache.flink.table.planner.factories.TestProjectableValuesTableFactory
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils._
 import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo
@@ -290,7 +290,7 @@ class CalcITCase extends StreamingTestBase {
 
   @Test
   def testSimpleProject(): Unit = {
-    val myTableDataId = 
TestProjectableValuesTableFactory.registerData(TestData.smallData3)
+    val myTableDataId = 
TestValuesTableFactory.registerData(TestData.smallData3)
     val ddl =
       s"""
          |CREATE TABLE SimpleTable (
@@ -298,7 +298,7 @@ class CalcITCase extends StreamingTestBase {
          |  b bigint,
          |  c string
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'data-id' = '$myTableDataId',
          |  'bounded' = 'true'
          |)
@@ -321,7 +321,7 @@ class CalcITCase extends StreamingTestBase {
       row(2, row(row("HELLO", 22), row(222, false)), row("hello", 2222), 
"mary"),
       row(3, row(row("HELLO WORLD", 33), row(333, true)), row("hello world", 
3333), "benji")
     )
-    val myTableDataId = TestProjectableValuesTableFactory.registerData(data)
+    val myTableDataId = TestValuesTableFactory.registerData(data)
     val ddl =
       s"""
          |CREATE TABLE NestedTable (
@@ -331,7 +331,7 @@ class CalcITCase extends StreamingTestBase {
          |  nested row<name string, `value` int>,
          |  name string
          |) WITH (
-         |  'connector' = 'projectable-values',
+         |  'connector' = 'values',
          |  'nested-projection-supported' = 'false',
          |  'data-id' = '$myTableDataId',
          |  'bounded' = 'true'
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
index 5b59ffb..403a240 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
@@ -32,7 +32,7 @@ import org.apache.flink.table.data.binary.BinaryRowData
 import org.apache.flink.table.data.writer.BinaryRowWriter
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, 
TableFunction}
 import org.apache.flink.table.planner.delegation.PlannerBase
-import 
org.apache.flink.table.planner.factories.{TestProjectableValuesTableFactory, 
TestValuesTableFactory}
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic
 import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
 import 
org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.DEFAULT_PARALLELISM
@@ -81,7 +81,6 @@ class BatchTestBase extends BatchAbstractTestBase {
   @After
   def after(): Unit = {
     TestValuesTableFactory.clearAllData()
-    TestProjectableValuesTableFactory.clearAllRegisteredData()
   }
 
   /**
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
index 3b6bbe4..3fffbc0 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.scala.StreamTableEnvironment
-import org.apache.flink.table.api.{EnvironmentSettings, 
ImplicitExpressionConversions}
-import 
org.apache.flink.table.planner.factories.{TestProjectableValuesTableFactory, 
TestValuesTableFactory}
+import org.apache.flink.table.api.ImplicitExpressionConversions
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.api.{EnvironmentSettings, Table}
 import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
@@ -62,7 +62,6 @@ class StreamingTestBase extends AbstractTestBase {
   def after(): Unit = {
     StreamTestSink.clear()
     TestValuesTableFactory.clearAllData()
-    TestProjectableValuesTableFactory.clearAllRegisteredData()
   }
 
   /**

Reply via email to