This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e5111c9  [FLINK-24776][table] Clarify DecodingFormat and introduce 
ProjectableDecodingFormat
e5111c9 is described below

commit e5111c970877b5772f0326ffbc998e0f6a8d351f
Author: slinkydeveloper <francescogu...@gmail.com>
AuthorDate: Thu Nov 11 10:53:43 2021 +0100

    [FLINK-24776][table] Clarify DecodingFormat and introduce 
ProjectableDecodingFormat
    
    Clearly separates projectable formats from non-projectable formats. Before 
this,
    the semantics were not 100% clear which led to inconsistent connector and 
format
    implementations.
    
    The FileSystemTableSource has been updated to distinguish between
    those two interfaces now. Users that implemented custom formats for
    FileSystemTableSource might need to verify the implementation.
    
    For convienience, we introduces helper classes such as ProjectedRowData
    and Projection to ease the implementation.
    
    This closes #17768.
---
 .../util/RecordMapperWrapperRecordIterator.java    |  33 +-
 .../confluent/RegistryAvroFormatFactory.java       |   8 +-
 .../debezium/DebeziumAvroFormatFactory.java        |   8 +-
 .../flink/formats/avro/AvroFormatFactory.java      |   9 +-
 .../flink/formats/json/JsonFormatFactory.java      |   9 +-
 .../json/canal/CanalJsonDecodingFormat.java        |   7 +-
 .../json/debezium/DebeziumJsonDecodingFormat.java  |   7 +-
 .../json/maxwell/MaxwellJsonDecodingFormat.java    |   8 +-
 .../org/apache/flink/orc/OrcFileFormatFactory.java | 124 +++----
 .../formats/parquet/ParquetFileFormatFactory.java  |  64 ++--
 .../apache/flink/table/connector/Projection.java   | 389 +++++++++++++++++++++
 .../table/connector/format/DecodingFormat.java     |  56 +++
 .../format/ProjectableDecodingFormat.java          |  71 ++++
 .../abilities/SupportsProjectionPushDown.java      |   5 +
 .../flink/table/data/utils/ProjectedRowData.java   | 235 +++++++++++++
 .../flink/table/connector/ProjectionTest.java      | 166 +++++++++
 .../table/data/utils/ProjectedRowDataTest.java     |  66 ++++
 .../filesystem/FileInfoExtractorBulkFormat.java    |  40 +--
 .../table/filesystem/FileSystemTableSource.java    | 106 ++++--
 .../table/filesystem/ProjectingBulkFormat.java     |  84 +++++
 .../filesystem/TestCsvFileSystemFormatFactory.java |  27 +-
 21 files changed, 1324 insertions(+), 198 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecordMapperWrapperRecordIterator.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecordMapperWrapperRecordIterator.java
index 76c4fb0..6d916bf 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecordMapperWrapperRecordIterator.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecordMapperWrapperRecordIterator.java
@@ -20,9 +20,14 @@ package org.apache.flink.connector.file.src.util;
 
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
 /**
  * Implementation of {@link 
org.apache.flink.connector.file.src.reader.BulkFormat.RecordIterator}
- * that wraps another iterator and performs the mapping of the records.
+ * that wraps another iterator and performs the mapping of the records. You 
can use {@link
+ * #wrapReader(BulkFormat.Reader, RecordMapper)} to wrap a whole reader.
  *
  * @param <I> Input type
  * @param <O> Mapped output type
@@ -61,4 +66,30 @@ public class RecordMapperWrapperRecordIterator<I, O> 
implements BulkFormat.Recor
     public void releaseBatch() {
         this.wrapped.releaseBatch();
     }
+
+    /**
+     * Wrap a {@link BulkFormat.Reader} applying a {@link RecordMapper} on the 
returned iterator.
+     *
+     * @param <I> Input type
+     * @param <O> Mapped output type
+     */
+    public static <I, O> BulkFormat.Reader<O> wrapReader(
+            BulkFormat.Reader<I> wrappedReader, RecordMapper<I, O> 
recordMapper) {
+        return new BulkFormat.Reader<O>() {
+            @Nullable
+            @Override
+            public BulkFormat.RecordIterator<O> readBatch() throws IOException 
{
+                BulkFormat.RecordIterator<I> iterator = 
wrappedReader.readBatch();
+                if (iterator == null) {
+                    return null;
+                }
+                return new RecordMapperWrapperRecordIterator<>(iterator, 
recordMapper);
+            }
+
+            @Override
+            public void close() throws IOException {
+                wrappedReader.close();
+            }
+        };
+    }
 }
diff --git 
a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java
 
b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java
index c1d0ff81..d4771fe 100644
--- 
a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java
+++ 
b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
@@ -81,10 +82,13 @@ public class RegistryAvroFormatFactory
         String schemaRegistryURL = formatOptions.get(URL);
         Map<String, ?> optionalPropertiesMap = 
buildOptionalPropertiesMap(formatOptions);
 
-        return new DecodingFormat<DeserializationSchema<RowData>>() {
+        return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() 
{
             @Override
             public DeserializationSchema<RowData> createRuntimeDecoder(
-                    DynamicTableSource.Context context, DataType 
producedDataType) {
+                    DynamicTableSource.Context context,
+                    DataType producedDataType,
+                    int[][] projections) {
+                producedDataType = DataType.projectFields(producedDataType, 
projections);
                 final RowType rowType = (RowType) 
producedDataType.getLogicalType();
                 final TypeInformation<RowData> rowDataTypeInfo =
                         context.createTypeInformation(producedDataType);
diff --git 
a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactory.java
 
b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactory.java
index eaf069a..d889dee 100644
--- 
a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactory.java
+++ 
b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactory.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
@@ -75,10 +76,13 @@ public class DebeziumAvroFormatFactory
         String schemaRegistryURL = formatOptions.get(URL);
         Map<String, ?> optionalPropertiesMap = 
buildOptionalPropertiesMap(formatOptions);
 
-        return new DecodingFormat<DeserializationSchema<RowData>>() {
+        return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() 
{
             @Override
             public DeserializationSchema<RowData> createRuntimeDecoder(
-                    DynamicTableSource.Context context, DataType 
producedDataType) {
+                    DynamicTableSource.Context context,
+                    DataType producedDataType,
+                    int[][] projections) {
+                producedDataType = DataType.projectFields(producedDataType, 
projections);
                 final RowType rowType = (RowType) 
producedDataType.getLogicalType();
                 final TypeInformation<RowData> producedTypeInfo =
                         context.createTypeInformation(producedDataType);
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java
index 7faa3e1..3860877 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
@@ -54,10 +55,14 @@ public class AvroFormatFactory implements 
DeserializationFormatFactory, Serializ
             DynamicTableFactory.Context context, ReadableConfig formatOptions) 
{
         FactoryUtil.validateFactoryOptions(this, formatOptions);
 
-        return new DecodingFormat<DeserializationSchema<RowData>>() {
+        return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() 
{
             @Override
             public DeserializationSchema<RowData> createRuntimeDecoder(
-                    DynamicTableSource.Context context, DataType 
producedDataType) {
+                    DynamicTableSource.Context context,
+                    DataType physicalDataType,
+                    int[][] projections) {
+                final DataType producedDataType =
+                        DataType.projectFields(physicalDataType, projections);
                 final RowType rowType = (RowType) 
producedDataType.getLogicalType();
                 final TypeInformation<RowData> rowDataTypeInfo =
                         context.createTypeInformation(producedDataType);
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
index 692973f..b78c024 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
@@ -28,6 +28,7 @@ import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
@@ -68,10 +69,14 @@ public class JsonFormatFactory implements 
DeserializationFormatFactory, Serializ
         final boolean ignoreParseErrors = 
formatOptions.get(IGNORE_PARSE_ERRORS);
         TimestampFormat timestampOption = 
JsonFormatOptionsUtil.getTimestampFormat(formatOptions);
 
-        return new DecodingFormat<DeserializationSchema<RowData>>() {
+        return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() 
{
             @Override
             public DeserializationSchema<RowData> createRuntimeDecoder(
-                    DynamicTableSource.Context context, DataType 
producedDataType) {
+                    DynamicTableSource.Context context,
+                    DataType physicalDataType,
+                    int[][] projections) {
+                final DataType producedDataType =
+                        DataType.projectFields(physicalDataType, projections);
                 final RowType rowType = (RowType) 
producedDataType.getLogicalType();
                 final TypeInformation<RowData> rowDataTypeInfo =
                         context.createTypeInformation(producedDataType);
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDecodingFormat.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDecodingFormat.java
index f958c8d..76f4e2e 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDecodingFormat.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDecodingFormat.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema.Metada
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
@@ -43,7 +44,8 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /** {@link DecodingFormat} for Canal using JSON encoding. */
-public class CanalJsonDecodingFormat implements 
DecodingFormat<DeserializationSchema<RowData>> {
+public class CanalJsonDecodingFormat
+        implements ProjectableDecodingFormat<DeserializationSchema<RowData>> {
 
     // 
--------------------------------------------------------------------------------------------
     // Mutable attributes
@@ -77,7 +79,8 @@ public class CanalJsonDecodingFormat implements 
DecodingFormat<DeserializationSc
 
     @Override
     public DeserializationSchema<RowData> createRuntimeDecoder(
-            DynamicTableSource.Context context, DataType physicalDataType) {
+            DynamicTableSource.Context context, DataType physicalDataType, 
int[][] projections) {
+        physicalDataType = DataType.projectFields(physicalDataType, 
projections);
         final List<ReadableMetadata> readableMetadata =
                 metadataKeys.stream()
                         .map(
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java
index 5db2f64..472596a 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.GenericMapData;
 import org.apache.flink.table.data.GenericRowData;
@@ -43,7 +44,8 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /** {@link DecodingFormat} for Debezium using JSON encoding. */
-public class DebeziumJsonDecodingFormat implements 
DecodingFormat<DeserializationSchema<RowData>> {
+public class DebeziumJsonDecodingFormat
+        implements ProjectableDecodingFormat<DeserializationSchema<RowData>> {
 
     // 
--------------------------------------------------------------------------------------------
     // Mutable attributes
@@ -71,7 +73,8 @@ public class DebeziumJsonDecodingFormat implements 
DecodingFormat<Deserializatio
 
     @Override
     public DeserializationSchema<RowData> createRuntimeDecoder(
-            DynamicTableSource.Context context, DataType physicalDataType) {
+            DynamicTableSource.Context context, DataType physicalDataType, 
int[][] projections) {
+        physicalDataType = DataType.projectFields(physicalDataType, 
projections);
 
         final List<ReadableMetadata> readableMetadata =
                 metadataKeys.stream()
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDecodingFormat.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDecodingFormat.java
index d040386..8aae68f 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDecodingFormat.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDecodingFormat.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.formats.json.maxwell.MaxwellJsonDeserializationSchema.Me
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
@@ -41,7 +42,8 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /** {@link DecodingFormat} for Maxwell using JSON encoding. */
-public class MaxwellJsonDecodingFormat implements 
DecodingFormat<DeserializationSchema<RowData>> {
+public class MaxwellJsonDecodingFormat
+        implements ProjectableDecodingFormat<DeserializationSchema<RowData>> {
 
     // 
--------------------------------------------------------------------------------------------
     // Mutable attributes
@@ -62,7 +64,9 @@ public class MaxwellJsonDecodingFormat implements 
DecodingFormat<Deserialization
 
     @Override
     public DeserializationSchema<RowData> createRuntimeDecoder(
-            DynamicTableSource.Context context, DataType physicalDataType) {
+            DynamicTableSource.Context context, DataType physicalDataType, 
int[][] projections) {
+        physicalDataType = DataType.projectFields(physicalDataType, 
projections);
+
         final List<ReadableMetadata> readableMetadata =
                 metadataKeys.stream()
                         .map(
diff --git 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
index 7343959..d6d4e95 100644
--- 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
+++ 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
@@ -27,8 +27,10 @@ import org.apache.flink.orc.shim.OrcShim;
 import org.apache.flink.orc.vector.RowDataVectorizer;
 import org.apache.flink.orc.writer.OrcBulkWriterFactory;
 import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.connector.format.BulkDecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
@@ -38,8 +40,6 @@ import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.factories.BulkReaderFormatFactory;
 import org.apache.flink.table.factories.BulkWriterFormatFactory;
 import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.table.filesystem.FileSystemConnectorOptions;
-import org.apache.flink.table.filesystem.PartitionFieldExtractor;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
@@ -48,6 +48,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.orc.TypeDescription;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
@@ -85,71 +86,7 @@ public class OrcFileFormatFactory implements 
BulkReaderFormatFactory, BulkWriter
     @Override
     public BulkDecodingFormat<RowData> createDecodingFormat(
             DynamicTableFactory.Context context, ReadableConfig formatOptions) 
{
-        return new BulkDecodingFormat<RowData>() {
-
-            private List<ResolvedExpression> filters;
-
-            @Override
-            public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
-                    DynamicTableSource.Context sourceContext, DataType 
producedDataType) {
-                List<OrcFilters.Predicate> orcPredicates = new ArrayList<>();
-
-                if (filters != null) {
-                    for (Expression pred : filters) {
-                        OrcFilters.Predicate orcPred = 
OrcFilters.toOrcPredicate(pred);
-                        if (orcPred != null) {
-                            orcPredicates.add(orcPred);
-                        }
-                    }
-                }
-
-                RowType tableType =
-                        (RowType)
-                                context.getCatalogTable()
-                                        .getSchema()
-                                        .toPhysicalRowDataType()
-                                        .getLogicalType();
-                List<String> tableFieldNames = tableType.getFieldNames();
-                RowType projectedType = (RowType) 
producedDataType.getLogicalType();
-
-                int[] selectedFields =
-                        projectedType.getFieldNames().stream()
-                                .mapToInt(tableFieldNames::indexOf)
-                                .toArray();
-
-                Properties properties = getOrcProperties(formatOptions);
-                Configuration conf = new Configuration();
-                properties.forEach((k, v) -> conf.set(k.toString(), 
v.toString()));
-
-                String defaultPartName =
-                        context.getCatalogTable()
-                                .getOptions()
-                                .getOrDefault(
-                                        
FileSystemConnectorOptions.PARTITION_DEFAULT_NAME.key(),
-                                        
FileSystemConnectorOptions.PARTITION_DEFAULT_NAME
-                                                .defaultValue());
-
-                return OrcColumnarRowFileInputFormat.createPartitionedFormat(
-                        OrcShim.defaultShim(),
-                        conf,
-                        tableType,
-                        context.getCatalogTable().getPartitionKeys(),
-                        PartitionFieldExtractor.forFileSystem(defaultPartName),
-                        selectedFields,
-                        orcPredicates,
-                        VectorizedColumnBatch.DEFAULT_SIZE);
-            }
-
-            @Override
-            public ChangelogMode getChangelogMode() {
-                return ChangelogMode.insertOnly();
-            }
-
-            @Override
-            public void applyFilters(List<ResolvedExpression> filters) {
-                this.filters = filters;
-            }
-        };
+        return new OrcBulkDecodingFormat(formatOptions);
     }
 
     @Override
@@ -177,4 +114,57 @@ public class OrcFileFormatFactory implements 
BulkReaderFormatFactory, BulkWriter
             }
         };
     }
+
+    private static class OrcBulkDecodingFormat
+            implements BulkDecodingFormat<RowData>,
+                    ProjectableDecodingFormat<BulkFormat<RowData, 
FileSourceSplit>> {
+
+        private final ReadableConfig formatOptions;
+        private List<ResolvedExpression> filters;
+
+        public OrcBulkDecodingFormat(ReadableConfig formatOptions) {
+            this.formatOptions = formatOptions;
+        }
+
+        @Override
+        public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
+                DynamicTableSource.Context sourceContext,
+                DataType producedDataType,
+                int[][] projections) {
+            List<OrcFilters.Predicate> orcPredicates = new ArrayList<>();
+
+            if (filters != null) {
+                for (Expression pred : filters) {
+                    OrcFilters.Predicate orcPred = 
OrcFilters.toOrcPredicate(pred);
+                    if (orcPred != null) {
+                        orcPredicates.add(orcPred);
+                    }
+                }
+            }
+
+            Properties properties = getOrcProperties(formatOptions);
+            Configuration conf = new Configuration();
+            properties.forEach((k, v) -> conf.set(k.toString(), v.toString()));
+
+            return OrcColumnarRowFileInputFormat.createPartitionedFormat(
+                    OrcShim.defaultShim(),
+                    conf,
+                    (RowType) producedDataType.getLogicalType(),
+                    Collections.emptyList(),
+                    null,
+                    Projection.of(projections).toTopLevelIndexes(),
+                    orcPredicates,
+                    VectorizedColumnBatch.DEFAULT_SIZE);
+        }
+
+        @Override
+        public ChangelogMode getChangelogMode() {
+            return ChangelogMode.insertOnly();
+        }
+
+        @Override
+        public void applyFilters(List<ResolvedExpression> filters) {
+            this.filters = filters;
+        }
+    }
 }
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java
index bf28661..f8e99f2c 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.BulkDecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
@@ -34,13 +35,12 @@ import 
org.apache.flink.table.data.vector.VectorizedColumnBatch;
 import org.apache.flink.table.factories.BulkReaderFormatFactory;
 import org.apache.flink.table.factories.BulkWriterFormatFactory;
 import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.table.filesystem.FileSystemConnectorOptions;
-import org.apache.flink.table.filesystem.PartitionFieldExtractor;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.hadoop.conf.Configuration;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
@@ -64,32 +64,7 @@ public class ParquetFileFormatFactory implements 
BulkReaderFormatFactory, BulkWr
     @Override
     public BulkDecodingFormat<RowData> createDecodingFormat(
             DynamicTableFactory.Context context, ReadableConfig formatOptions) 
{
-        return new BulkDecodingFormat<RowData>() {
-            @Override
-            public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
-                    DynamicTableSource.Context sourceContext, DataType 
producedDataType) {
-                String defaultPartName =
-                        context.getCatalogTable()
-                                .getOptions()
-                                .getOrDefault(
-                                        
FileSystemConnectorOptions.PARTITION_DEFAULT_NAME.key(),
-                                        
FileSystemConnectorOptions.PARTITION_DEFAULT_NAME
-                                                .defaultValue());
-                return ParquetColumnarRowInputFormat.createPartitionedFormat(
-                        getParquetConfiguration(formatOptions),
-                        (RowType) producedDataType.getLogicalType(),
-                        context.getCatalogTable().getPartitionKeys(),
-                        PartitionFieldExtractor.forFileSystem(defaultPartName),
-                        VectorizedColumnBatch.DEFAULT_SIZE,
-                        formatOptions.get(UTC_TIMEZONE),
-                        true);
-            }
-
-            @Override
-            public ChangelogMode getChangelogMode() {
-                return ChangelogMode.insertOnly();
-            }
-        };
+        return new ParquetBulkDecodingFormat(formatOptions);
     }
 
     @Override
@@ -134,4 +109,37 @@ public class ParquetFileFormatFactory implements 
BulkReaderFormatFactory, BulkWr
     public Set<ConfigOption<?>> optionalOptions() {
         return new HashSet<>();
     }
+
+    private static class ParquetBulkDecodingFormat
+            implements ProjectableDecodingFormat<BulkFormat<RowData, 
FileSourceSplit>>,
+                    BulkDecodingFormat<RowData> {
+
+        private final ReadableConfig formatOptions;
+
+        public ParquetBulkDecodingFormat(ReadableConfig formatOptions) {
+            this.formatOptions = formatOptions;
+        }
+
+        @Override
+        public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
+                DynamicTableSource.Context sourceContext,
+                DataType producedDataType,
+                int[][] projections) {
+
+            return ParquetColumnarRowInputFormat.createPartitionedFormat(
+                    getParquetConfiguration(formatOptions),
+                    (RowType)
+                            DataType.projectFields(producedDataType, 
projections).getLogicalType(),
+                    Collections.emptyList(),
+                    null,
+                    VectorizedColumnBatch.DEFAULT_SIZE,
+                    formatOptions.get(UTC_TIMEZONE),
+                    true);
+        }
+
+        @Override
+        public ChangelogMode getChangelogMode() {
+            return ChangelogMode.insertOnly();
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/Projection.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/Projection.java
new file mode 100644
index 0000000..e2758aa
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/Projection.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * {@link Projection} represents a list of (possibly nested) indexes that can 
be used to project
+ * data types. A row projection includes both reducing the accessible fields 
and reordering them.
+ */
+@PublicEvolving
+public abstract class Projection {
+
+    // sealed class
+    private Projection() {}
+
+    /** Project the provided {@link DataType} using this {@link Projection}. */
+    public abstract DataType project(DataType dataType);
+
+    /** @return {@code true} whether this projection is nested or not. */
+    public abstract boolean isNested();
+
+    /**
+     * Perform a difference of this {@link Projection} with another {@link 
Projection}. The result
+     * of this operation is a new {@link Projection} retaining the same 
ordering of this instance
+     * but with the indexes from {@code other} removed. For example:
+     *
+     * <pre>
+     * <code>
+     * [4, 1, 0, 3, 2] - [4, 2] = [1, 0, 2]
+     * </code>
+     * </pre>
+     *
+     * <p>Note how the index {@code 3} in the minuend becomes {@code 2} 
because it's rescaled to
+     * project correctly a {@link RowData} or arity 3.
+     *
+     * @param other the subtrahend
+     * @throws IllegalArgumentException when {@code other} is nested.
+     */
+    public abstract Projection difference(Projection other);
+
+    /**
+     * Complement this projection. The returned projection is an ordered 
projection of fields from 0
+     * to {@code fieldsNumber} except the indexes in this {@link Projection}. 
For example:
+     *
+     * <pre>
+     * <code>
+     * [4, 2].complement(5) = [0, 1, 3]
+     * </code>
+     * </pre>
+     *
+     * @param fieldsNumber the size of the universe
+     * @throws IllegalStateException if this projection is nested.
+     */
+    public abstract Projection complement(int fieldsNumber);
+
+    /** Like {@link #complement(int)}, using the {@code dataType} fields 
count. */
+    public Projection complement(DataType dataType) {
+        return complement(DataType.getFieldCount(dataType));
+    }
+
+    /**
+     * Convert this instance to a projection of top level indexes. The array 
represents the mapping
+     * of the fields of the original {@link DataType}. For example, {@code [0, 
2, 1]} specifies to
+     * include in the following order the 1st field, the 3rd field and the 2nd 
field of the row.
+     *
+     * @throws IllegalStateException if this projection is nested.
+     */
+    public abstract int[] toTopLevelIndexes();
+
+    /**
+     * Convert this instance to a nested projection index paths. The array 
represents the mapping of
+     * the fields of the original {@link DataType}, including nested rows. For 
example, {@code [[0,
+     * 2, 1], ...]} specifies to include the 2nd field of the 3rd field of the 
1st field in the
+     * top-level row.
+     */
+    public abstract int[][] toNestedIndexes();
+
+    /**
+     * Create an empty {@link Projection}, that is a projection that projects 
no fields, returning
+     * an empty {@link DataType}.
+     */
+    public static Projection empty() {
+        return EmptyProjection.INSTANCE;
+    }
+
+    /**
+     * Create a {@link Projection} of the provided {@code indexes}.
+     *
+     * @see #toTopLevelIndexes()
+     */
+    public static Projection of(int[] indexes) {
+        if (indexes.length == 0) {
+            return empty();
+        }
+        return new TopLevelProjection(indexes);
+    }
+
+    /**
+     * Create a {@link Projection} of the provided {@code indexes}.
+     *
+     * @see #toNestedIndexes()
+     */
+    public static Projection of(int[][] indexes) {
+        if (indexes.length == 0) {
+            return empty();
+        }
+        return new NestedProjection(indexes);
+    }
+
+    /**
+     * Create a {@link Projection} of the provided {@code dataType} using the 
provided {@code
+     * projectedFields}.
+     */
+    public static Projection fromFieldNames(DataType dataType, List<String> 
projectedFields) {
+        List<String> dataTypeFieldNames = DataType.getFieldNames(dataType);
+        return new TopLevelProjection(
+                
projectedFields.stream().mapToInt(dataTypeFieldNames::indexOf).toArray());
+    }
+
+    /** Create a {@link Projection} of all the fields in the provided {@code 
dataType}. */
+    public static Projection all(DataType dataType) {
+        return new TopLevelProjection(
+                IntStream.range(0, 
DataType.getFieldCount(dataType)).toArray());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof Projection)) {
+            return false;
+        }
+        Projection other = (Projection) o;
+        if (!this.isNested() && !other.isNested()) {
+            return Arrays.equals(this.toTopLevelIndexes(), 
other.toTopLevelIndexes());
+        }
+        return Arrays.deepEquals(this.toNestedIndexes(), 
other.toNestedIndexes());
+    }
+
+    @Override
+    public int hashCode() {
+        if (isNested()) {
+            return Arrays.deepHashCode(toNestedIndexes());
+        }
+        return Arrays.hashCode(toTopLevelIndexes());
+    }
+
+    @Override
+    public String toString() {
+        if (isNested()) {
+            return "Nested projection = " + 
Arrays.deepToString(toNestedIndexes());
+        }
+        return "Top level projection = " + 
Arrays.toString(toTopLevelIndexes());
+    }
+
+    private static class EmptyProjection extends Projection {
+
+        static final EmptyProjection INSTANCE = new EmptyProjection();
+
+        private EmptyProjection() {}
+
+        @Override
+        public DataType project(DataType dataType) {
+            return DataType.projectFields(dataType, toTopLevelIndexes());
+        }
+
+        @Override
+        public boolean isNested() {
+            return false;
+        }
+
+        @Override
+        public Projection difference(Projection projection) {
+            return this;
+        }
+
+        @Override
+        public Projection complement(int fieldsNumber) {
+            return new TopLevelProjection(IntStream.range(0, 
fieldsNumber).toArray());
+        }
+
+        @Override
+        public int[] toTopLevelIndexes() {
+            return new int[0];
+        }
+
+        @Override
+        public int[][] toNestedIndexes() {
+            return new int[0][];
+        }
+    }
+
+    private static class NestedProjection extends Projection {
+
+        final int[][] projection;
+        final boolean nested;
+
+        NestedProjection(int[][] projection) {
+            this.projection = projection;
+            this.nested = Arrays.stream(projection).anyMatch(arr -> arr.length 
> 1);
+        }
+
+        @Override
+        public DataType project(DataType dataType) {
+            return DataType.projectFields(dataType, projection);
+        }
+
+        @Override
+        public boolean isNested() {
+            return nested;
+        }
+
+        @Override
+        public Projection difference(Projection other) {
+            if (other.isNested()) {
+                throw new IllegalArgumentException(
+                        "Cannot perform difference between nested projection 
and nested projection");
+            }
+            if (other instanceof EmptyProjection) {
+                return this;
+            }
+            if (!this.isNested()) {
+                return new 
TopLevelProjection(toTopLevelIndexes()).difference(other);
+            }
+
+            // Extract the indexes to exclude and sort them
+            int[] indexesToExclude = other.toTopLevelIndexes();
+            indexesToExclude = Arrays.copyOf(indexesToExclude, 
indexesToExclude.length);
+            Arrays.sort(indexesToExclude);
+
+            List<int[]> resultProjection =
+                    
Arrays.stream(projection).collect(Collectors.toCollection(ArrayList::new));
+
+            ListIterator<int[]> resultProjectionIterator = 
resultProjection.listIterator();
+            while (resultProjectionIterator.hasNext()) {
+                int[] indexArr = resultProjectionIterator.next();
+
+                // Let's check if the index is inside the indexesToExclude 
array
+                int searchResult = Arrays.binarySearch(indexesToExclude, 
indexArr[0]);
+                if (searchResult >= 0) {
+                    // Found, we need to remove it
+                    resultProjectionIterator.remove();
+                } else {
+                    // Not found, let's compute the offset.
+                    // Offset is the index where the projection index should 
be inserted in the
+                    // indexesToExclude array
+                    int offset = (-(searchResult) - 1);
+                    if (offset != 0) {
+                        indexArr[0] = indexArr[0] - offset;
+                    }
+                }
+            }
+
+            return new NestedProjection(resultProjection.toArray(new 
int[0][]));
+        }
+
+        @Override
+        public Projection complement(int fieldsNumber) {
+            if (isNested()) {
+                throw new IllegalStateException("Cannot perform complement of 
a nested projection");
+            }
+            return new 
TopLevelProjection(toTopLevelIndexes()).complement(fieldsNumber);
+        }
+
+        @Override
+        public int[] toTopLevelIndexes() {
+            if (isNested()) {
+                throw new IllegalStateException(
+                        "Cannot convert a nested projection to a top level 
projection");
+            }
+            return Arrays.stream(projection).mapToInt(arr -> arr[0]).toArray();
+        }
+
+        @Override
+        public int[][] toNestedIndexes() {
+            return projection;
+        }
+    }
+
+    private static class TopLevelProjection extends Projection {
+
+        final int[] projection;
+
+        TopLevelProjection(int[] projection) {
+            this.projection = projection;
+        }
+
+        @Override
+        public DataType project(DataType dataType) {
+            return DataType.projectFields(dataType, this.projection);
+        }
+
+        @Override
+        public boolean isNested() {
+            return false;
+        }
+
+        @Override
+        public Projection difference(Projection other) {
+            if (other.isNested()) {
+                throw new IllegalArgumentException(
+                        "Cannot perform difference between top level 
projection and nested projection");
+            }
+            if (other instanceof EmptyProjection) {
+                return this;
+            }
+
+            // Extract the indexes to exclude and sort them
+            int[] indexesToExclude = other.toTopLevelIndexes();
+            indexesToExclude = Arrays.copyOf(indexesToExclude, 
indexesToExclude.length);
+            Arrays.sort(indexesToExclude);
+
+            List<Integer> resultProjection =
+                    Arrays.stream(projection)
+                            .boxed()
+                            .collect(Collectors.toCollection(ArrayList::new));
+
+            ListIterator<Integer> resultProjectionIterator = 
resultProjection.listIterator();
+            while (resultProjectionIterator.hasNext()) {
+                int index = resultProjectionIterator.next();
+
+                // Let's check if the index is inside the indexesToExclude 
array
+                int searchResult = Arrays.binarySearch(indexesToExclude, 
index);
+                if (searchResult >= 0) {
+                    // Found, we need to remove it
+                    resultProjectionIterator.remove();
+                } else {
+                    // Not found, let's compute the offset.
+                    // Offset is the index where the projection index should 
be inserted in the
+                    // indexesToExclude array
+                    int offset = (-(searchResult) - 1);
+                    if (offset != 0) {
+                        resultProjectionIterator.set(index - offset);
+                    }
+                }
+            }
+
+            return new TopLevelProjection(resultProjection.stream().mapToInt(i 
-> i).toArray());
+        }
+
+        @Override
+        public Projection complement(int fieldsNumber) {
+            int[] indexesToExclude = Arrays.copyOf(projection, 
projection.length);
+            Arrays.sort(indexesToExclude);
+
+            return new TopLevelProjection(
+                    IntStream.range(0, fieldsNumber)
+                            .filter(i -> Arrays.binarySearch(indexesToExclude, 
i) < 0)
+                            .toArray());
+        }
+
+        @Override
+        public int[] toTopLevelIndexes() {
+            return projection;
+        }
+
+        @Override
+        public int[][] toNestedIndexes() {
+            return Arrays.stream(projection).mapToObj(i -> new int[] 
{i}).toArray(int[][]::new);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/DecodingFormat.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/DecodingFormat.java
index ffbe672..71061c6 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/DecodingFormat.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/DecodingFormat.java
@@ -19,8 +19,13 @@
 package org.apache.flink.table.connector.format;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.types.DataType;
 
 import java.util.Collections;
@@ -30,6 +35,53 @@ import java.util.Map;
 /**
  * A {@link Format} for a {@link DynamicTableSource} for reading rows.
  *
+ * <h1>Implementing a {@link DecodingFormat}</h1>
+ *
+ * {@link DecodingFormat#createRuntimeDecoder(DynamicTableSource.Context, 
DataType)} takes a {@code
+ * physicalDataType}. This {@link DataType} has usually been derived from a 
table's {@link
+ * ResolvedSchema} and excludes partition, metadata, and other auxiliary 
columns. The {@code
+ * physicalDataType} should describe exactly the full serialized record. In 
other words: for every
+ * field in the serialized record there is a corresponding field at the same 
position in the {@code
+ * physicalDataType}. Some implementations may decide to be more lenient and 
allow users to omit
+ * fields but this depends on the format characteristics. For example, a CSV 
format implementation
+ * might allow the user to define the schema only for the first 5 of the 10 
total columns available
+ * in each row.
+ *
+ * <p>If the format supports projections, that is it can exclude certain 
fields from being parsed
+ * <b>independently of the fields defined in the schema</b> and <b>can reorder 
fields</b> in the
+ * produced {@link RowData}, then it should implement {@link 
ProjectableDecodingFormat}. {@link
+ * ProjectableDecodingFormat#createRuntimeDecoder(DynamicTableSource.Context, 
DataType, int[][])}
+ * provides the {@code physicalDataType} as described above and provides 
{@code projections} to
+ * compute the type to produce using {@code 
DataType.projectFields(physicalDataType, projections)}.
+ * For example, a JSON format implementation may match the fields based on the 
JSON object keys,
+ * hence it can easily produce {@link RowData} excluding unused object values 
and set values inside
+ * the {@link RowData} using the index provided by the {@code projections} 
array.
+ *
+ * <p>Whenever possible, it's highly recommended implementing {@link 
ProjectableDecodingFormat}, as
+ * it might help to reduce the data volume when users are reading large 
records but are using only a
+ * small subset of fields.
+ *
+ * <h1>Using a {@link DecodingFormat}</h1>
+ *
+ * {@link DynamicTableSource} that doesn't implement {@link 
SupportsProjectionPushDown} should
+ * invoke {@link 
DecodingFormat#createRuntimeDecoder(DynamicTableSource.Context, DataType)}.
+ * Usually, {@link DynamicTableFactory.Context#getPhysicalRowDataType()} can 
provide the {@code
+ * physicalDataType} (stripped of any fields not available in the serialized 
record).
+ *
+ * <p>{@link DynamicTableSource} implementing {@link 
SupportsProjectionPushDown} should check
+ * whether the {@link DecodingFormat} is an instance of {@link 
ProjectableDecodingFormat}:
+ *
+ * <ul>
+ *   <li>If yes, then the connector can invoke {@link
+ *       
ProjectableDecodingFormat#createRuntimeDecoder(DynamicTableSource.Context, 
DataType,
+ *       int[][])} providing a non null {@code projections} array excluding 
auxiliary fields. The
+ *       built runtime implementation will take care of projections, producing 
records of type
+ *       {@code DataType.projectFields(physicalDataType, projections)}.
+ *   <li>If no, then the connector must take care of performing the 
projection, for example using
+ *       {@link ProjectedRowData} to project physical {@link RowData} emitted 
from the decoder
+ *       runtime implementation.
+ * </ul>
+ *
  * @param <I> runtime interface needed by the table source
  */
 @PublicEvolving
@@ -38,6 +90,10 @@ public interface DecodingFormat<I> extends Format {
     /**
      * Creates runtime decoder implementation that is configured to produce 
data of the given data
      * type.
+     *
+     * @param context the context provides several utilities required to 
instantiate the runtime
+     *     decoder implementation of the format
+     * @param physicalDataType For more details check the documentation of 
{@link DecodingFormat}.
      */
     I createRuntimeDecoder(DynamicTableSource.Context context, DataType 
physicalDataType);
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ProjectableDecodingFormat.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ProjectableDecodingFormat.java
new file mode 100644
index 0000000..467ebf1
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ProjectableDecodingFormat.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.format;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.types.DataType;
+
+/**
+ * Extension of {@link DecodingFormat} which is able to produce projected rows.
+ *
+ * <p>For more details on usage and differences between {@link DecodingFormat} 
and {@link
+ * ProjectableDecodingFormat}, check the documentation of {@link 
DecodingFormat}.
+ *
+ * @see Projection
+ * @see ProjectedRowData
+ */
+@PublicEvolving
+public interface ProjectableDecodingFormat<I> extends DecodingFormat<I> {
+
+    /** Returns whether this format supports nested projection. */
+    default boolean supportsNestedProjection() {
+        return false;
+    }
+
+    /**
+     * Creates runtime decoder implementation that is configured to produce 
data of type {@code
+     * DataType.projectFields(physicalDataType, projections)}. For more 
details on the usage, check
+     * {@link DecodingFormat} documentation.
+     *
+     * @param context the context provides several utilities required to 
instantiate the runtime
+     *     decoder implementation of the format
+     * @param physicalDataType For more details check {@link DecodingFormat}
+     * @param projections the projections array. The array represents the 
mapping of the fields of
+     *     the original {@link DataType}, including nested rows. For example, 
{@code [[0, 2, 1],
+     *     ...]} specifies to include the 2nd field of the 3rd field of the 
1st field in the
+     *     top-level row. It's guaranteed that this array won't contain nested 
projections if {@link
+     *     #supportsNestedProjection()} returns {@code true}. For more 
details, check {@link
+     *     Projection} as well.
+     * @return the runtime decoder
+     * @see DecodingFormat
+     */
+    I createRuntimeDecoder(
+            DynamicTableSource.Context context, DataType physicalDataType, 
int[][] projections);
+
+    default I createRuntimeDecoder(
+            DynamicTableSource.Context context, DataType 
projectedPhysicalDataType) {
+        return createRuntimeDecoder(
+                context,
+                projectedPhysicalDataType,
+                Projection.all(projectedPhysicalDataType).toNestedIndexes());
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsProjectionPushDown.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsProjectionPushDown.java
index c63cedb..5022eeb 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsProjectionPushDown.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsProjectionPushDown.java
@@ -21,7 +21,9 @@ package org.apache.flink.table.connector.source.abilities;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.utils.ProjectedRowData;
 import org.apache.flink.table.types.DataType;
 
 /**
@@ -45,6 +47,9 @@ import org.apache.flink.table.types.DataType;
  * different field order). It does not contain any computation. A projection 
can either be performed
  * on the fields of the top-level row only or consider nested fields as well 
(see {@link
  * #supportsNestedProjection()}).
+ *
+ * @see Projection
+ * @see ProjectedRowData
  */
 @PublicEvolving
 public interface SupportsProjectionPushDown {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java
new file mode 100644
index 0000000..196796a
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+
+/**
+ * An implementation of {@link RowData} which provides a projected view of the 
underlying {@link
+ * RowData}.
+ *
+ * <p>Projection includes both reducing the accessible fields and reordering 
them.
+ *
+ * <p>Note: This class supports only top-level projections, not nested 
projections.
+ */
+@PublicEvolving
+public class ProjectedRowData implements RowData {
+
+    private final int[] indexMapping;
+
+    private RowData row;
+
+    private ProjectedRowData(int[] indexMapping) {
+        this.indexMapping = indexMapping;
+    }
+
+    /**
+     * Replaces the underlying {@link RowData} backing this {@link 
ProjectedRowData}.
+     *
+     * <p>This method replaces the row data in place and does not return a new 
object. This is done
+     * for performance reasons.
+     */
+    public ProjectedRowData replaceRow(RowData row) {
+        this.row = row;
+        return this;
+    }
+
+    // 
---------------------------------------------------------------------------------------------
+
+    @Override
+    public int getArity() {
+        return indexMapping.length;
+    }
+
+    @Override
+    public RowKind getRowKind() {
+        return row.getRowKind();
+    }
+
+    @Override
+    public void setRowKind(RowKind kind) {
+        row.setRowKind(kind);
+    }
+
+    @Override
+    public boolean isNullAt(int pos) {
+        return row.isNullAt(indexMapping[pos]);
+    }
+
+    @Override
+    public boolean getBoolean(int pos) {
+        return row.getBoolean(indexMapping[pos]);
+    }
+
+    @Override
+    public byte getByte(int pos) {
+        return row.getByte(indexMapping[pos]);
+    }
+
+    @Override
+    public short getShort(int pos) {
+        return row.getShort(indexMapping[pos]);
+    }
+
+    @Override
+    public int getInt(int pos) {
+        return row.getInt(indexMapping[pos]);
+    }
+
+    @Override
+    public long getLong(int pos) {
+        return row.getLong(indexMapping[pos]);
+    }
+
+    @Override
+    public float getFloat(int pos) {
+        return row.getFloat(indexMapping[pos]);
+    }
+
+    @Override
+    public double getDouble(int pos) {
+        return row.getDouble(indexMapping[pos]);
+    }
+
+    @Override
+    public StringData getString(int pos) {
+        return row.getString(indexMapping[pos]);
+    }
+
+    @Override
+    public DecimalData getDecimal(int pos, int precision, int scale) {
+        return row.getDecimal(indexMapping[pos], precision, scale);
+    }
+
+    @Override
+    public TimestampData getTimestamp(int pos, int precision) {
+        return row.getTimestamp(indexMapping[pos], precision);
+    }
+
+    @Override
+    public <T> RawValueData<T> getRawValue(int pos) {
+        return row.getRawValue(indexMapping[pos]);
+    }
+
+    @Override
+    public byte[] getBinary(int pos) {
+        return row.getBinary(indexMapping[pos]);
+    }
+
+    @Override
+    public ArrayData getArray(int pos) {
+        return row.getArray(indexMapping[pos]);
+    }
+
+    @Override
+    public MapData getMap(int pos) {
+        return row.getMap(indexMapping[pos]);
+    }
+
+    @Override
+    public RowData getRow(int pos, int numFields) {
+        return row.getRow(indexMapping[pos], numFields);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        throw new UnsupportedOperationException("Projected row data cannot be 
compared");
+    }
+
+    @Override
+    public int hashCode() {
+        throw new UnsupportedOperationException("Projected row data cannot be 
hashed");
+    }
+
+    @Override
+    public String toString() {
+        return row.getRowKind().shortString()
+                + "{"
+                + "indexMapping="
+                + Arrays.toString(indexMapping)
+                + ", mutableRow="
+                + row
+                + '}';
+    }
+
+    /**
+     * Like {@link #from(int[])}, but throws {@link IllegalArgumentException} 
if the provided {@code
+     * projection} array contains nested projections, which are not supported 
by {@link
+     * ProjectedRowData}.
+     *
+     * <p>The array represents the mapping of the fields of the original 
{@link DataType}, including
+     * nested rows. For example, {@code [[0, 2, 1], ...]} specifies to include 
the 2nd field of the
+     * 3rd field of the 1st field in the top-level row.
+     *
+     * @see Projection
+     * @see ProjectedRowData
+     */
+    public static ProjectedRowData from(int[][] projection) throws 
IllegalArgumentException {
+        return new ProjectedRowData(
+                Arrays.stream(projection)
+                        .mapToInt(
+                                arr -> {
+                                    if (arr.length != 1) {
+                                        throw new IllegalArgumentException(
+                                                "ProjectedRowData doesn't 
support nested projections");
+                                    }
+                                    return arr[0];
+                                })
+                        .toArray());
+    }
+
+    /**
+     * Create an empty {@link ProjectedRowData} starting from a {@code 
projection} array.
+     *
+     * <p>The array represents the mapping of the fields of the original 
{@link DataType}. For
+     * example, {@code [0, 2, 1]} specifies to include in the following order 
the 1st field, the 3rd
+     * field and the 2nd field of the row.
+     *
+     * @see Projection
+     * @see ProjectedRowData
+     */
+    public static ProjectedRowData from(int[] projection) {
+        return new ProjectedRowData(projection);
+    }
+
+    /**
+     * Create an empty {@link ProjectedRowData} starting from a {@link 
Projection}.
+     *
+     * <p>Throws {@link IllegalStateException} if the provided {@code 
projection} array contains
+     * nested projections, which are not supported by {@link ProjectedRowData}.
+     *
+     * @see Projection
+     * @see ProjectedRowData
+     */
+    public static ProjectedRowData from(Projection projection) {
+        return new ProjectedRowData(projection.toTopLevelIndexes());
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/ProjectionTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/ProjectionTest.java
new file mode 100644
index 0000000..5b3afba
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/ProjectionTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector;
+
+import org.apache.flink.table.types.DataType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class ProjectionTest {
+
+    @Test
+    void testTopLevelProject() {
+        assertEquals(
+                ROW(FIELD("f2", INT()), FIELD("f1", STRING())),
+                Projection.of(new int[] {2, 1})
+                        .project(
+                                ROW(
+                                        FIELD("f0", BIGINT()),
+                                        FIELD("f1", STRING()),
+                                        FIELD("f2", INT()))));
+    }
+
+    @Test
+    void testNestedProject() {
+        final DataType thirdLevelRow =
+                ROW(FIELD("c0", BOOLEAN()), FIELD("c1", DOUBLE()), FIELD("c2", 
INT()));
+        final DataType secondLevelRow =
+                ROW(FIELD("b0", BOOLEAN()), FIELD("b1", thirdLevelRow), 
FIELD("b2", INT()));
+        final DataType topLevelRow =
+                ROW(FIELD("a0", INT()), FIELD("a1", secondLevelRow), 
FIELD("a1_b1_c0", INT()));
+
+        assertEquals(
+                ROW(FIELD("a0", INT()), FIELD("a1_b1_c0", BOOLEAN())),
+                Projection.of(new int[][] {{0}, {1, 1, 
0}}).project(topLevelRow));
+        assertEquals(
+                ROW(FIELD("a1_b1", thirdLevelRow), FIELD("a0", INT())),
+                Projection.of(new int[][] {{1, 1}, {0}}).project(topLevelRow));
+        assertEquals(
+                ROW(
+                        FIELD("a1_b1_c2", INT()),
+                        FIELD("a1_b1_c1", DOUBLE()),
+                        FIELD("a1_b1_c0", BOOLEAN())),
+                Projection.of(new int[][] {{1, 1, 2}, {1, 1, 1}, {1, 1, 
0}}).project(topLevelRow));
+        assertEquals(
+                ROW(FIELD("a1_b1_c0", BOOLEAN()), FIELD("a1_b1_c0_$0", INT())),
+                Projection.of(new int[][] {{1, 1, 0}, 
{2}}).project(topLevelRow));
+    }
+
+    @Test
+    void testIsNested() {
+        assertFalse(Projection.of(new int[] {2, 1}).isNested());
+        assertFalse(Projection.of(new int[][] {new int[] {1}, new int[] 
{3}}).isNested());
+        assertTrue(
+                Projection.of(new int[][] {new int[] {1}, new int[] {1, 2}, 
new int[] {3}})
+                        .isNested());
+    }
+
+    @Test
+    void testDifference() {
+        assertEquals(
+                Projection.of(new int[] {1, 0, 2}),
+                Projection.of(new int[] {4, 1, 0, 3, 2})
+                        .difference(Projection.of(new int[] {4, 2})));
+
+        assertEquals(
+                Projection.of(new int[][] {new int[] {1, 3}, new int[] {0}, 
new int[] {2, 1}}),
+                Projection.of(
+                                new int[][] {
+                                    new int[] {4},
+                                    new int[] {1, 3},
+                                    new int[] {0},
+                                    new int[] {3, 1},
+                                    new int[] {2}
+                                })
+                        .difference(Projection.of(new int[] {4, 2})));
+
+        assertThrows(
+                IllegalArgumentException.class,
+                () ->
+                        Projection.of(new int[] {1, 2, 3, 4})
+                                .difference(
+                                        Projection.of(
+                                                new int[][] {new int[] {2}, 
new int[] {3, 4}})));
+    }
+
+    @Test
+    void testComplement() {
+        assertEquals(
+                Projection.of(new int[] {0, 3}), Projection.of(new int[] {4, 
1, 2}).complement(5));
+
+        assertEquals(
+                Projection.of(new int[] {0, 3}),
+                Projection.of(new int[][] {new int[] {4}, new int[] {1}, new 
int[] {2}})
+                        .complement(5));
+
+        assertThrows(
+                IllegalStateException.class,
+                () ->
+                        Projection.of(new int[][] {new int[] {4}, new int[] 
{1, 3}, new int[] {2}})
+                                .complement(10));
+    }
+
+    @Test
+    void testToTopLevelIndexes() {
+        assertArrayEquals(
+                new int[] {1, 2, 3, 4}, Projection.of(new int[] {1, 2, 3, 
4}).toTopLevelIndexes());
+
+        assertArrayEquals(
+                new int[] {4, 1, 2},
+                Projection.of(new int[][] {new int[] {4}, new int[] {1}, new 
int[] {2}})
+                        .toTopLevelIndexes());
+
+        assertThrows(
+                IllegalStateException.class,
+                () ->
+                        Projection.of(new int[][] {new int[] {4}, new int[] 
{1, 3}, new int[] {2}})
+                                .toTopLevelIndexes());
+    }
+
+    @Test
+    void testToNestedIndexes() {
+        assertArrayEquals(
+                new int[][] {new int[] {1}, new int[] {2}, new int[] {3}, new 
int[] {4}},
+                Projection.of(new int[] {1, 2, 3, 4}).toNestedIndexes());
+        assertArrayEquals(
+                new int[][] {new int[] {4}, new int[] {1, 3}, new int[] {2}},
+                Projection.of(new int[][] {new int[] {4}, new int[] {1, 3}, 
new int[] {2}})
+                        .toNestedIndexes());
+    }
+
+    @Test
+    void testEquals() {
+        assertEquals(
+                Projection.of(new int[] {1, 2, 3}),
+                Projection.of(new int[][] {new int[] {1}, new int[] {2}, new 
int[] {3}}));
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/data/utils/ProjectedRowDataTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/data/utils/ProjectedRowDataTest.java
new file mode 100644
index 0000000..9bb2b05
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/data/utils/ProjectedRowDataTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.utils;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Tests for {@link ProjectedRowData}. */
+public class ProjectedRowDataTest {
+
+    @Test
+    public void testProjectedRows() {
+        final RowData initialRow = GenericRowData.of(0L, 1L, 2L, 3L, 4L);
+        final ProjectedRowData projectedRowData =
+                ProjectedRowData.from(
+                        new int[][] {new int[] {2}, new int[] {0}, new int[] 
{1}, new int[] {4}});
+        projectedRowData.replaceRow(initialRow);
+
+        assertEquals(RowKind.INSERT, initialRow.getRowKind());
+        assertEquals(4, projectedRowData.getArity());
+        assertEquals(2L, projectedRowData.getLong(0));
+        assertEquals(0L, projectedRowData.getLong(1));
+        assertEquals(1L, projectedRowData.getLong(2));
+        assertEquals(4L, projectedRowData.getLong(3));
+
+        projectedRowData.replaceRow(GenericRowData.of(5L, 6L, 7L, 8L, 9L, 
10L));
+        assertEquals(4, projectedRowData.getArity());
+        assertEquals(7L, projectedRowData.getLong(0));
+        assertEquals(5L, projectedRowData.getLong(1));
+        assertEquals(6L, projectedRowData.getLong(2));
+        assertEquals(9L, projectedRowData.getLong(3));
+    }
+
+    @Test
+    public void testProjectedRowsDoesntSupportNestedProjections() {
+        assertThrows(
+                IllegalArgumentException.class,
+                () ->
+                        ProjectedRowData.from(
+                                new int[][] {
+                                    new int[] {2}, new int[] {0, 1}, new int[] 
{1}, new int[] {4}
+                                }));
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileInfoExtractorBulkFormat.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileInfoExtractorBulkFormat.java
index 58c7b49..5dcf0ff 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileInfoExtractorBulkFormat.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileInfoExtractorBulkFormat.java
@@ -30,8 +30,6 @@ import 
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.PartitionPathUtils;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.ArrayList;
@@ -163,37 +161,11 @@ class FileInfoExtractorBulkFormat implements 
BulkFormat<RowData, FileSourceSplit
         final EnrichedRowData producedRowData =
                 new EnrichedRowData(fileInfoRowData, 
this.extendedRowIndexMapping);
 
-        return new ReaderWrapper(superReader, producedRowData);
-    }
-
-    private static final class ReaderWrapper implements Reader<RowData> {
-
-        private final Reader<RowData> wrappedReader;
-        private final EnrichedRowData producedRowData;
-
-        private ReaderWrapper(Reader<RowData> wrappedReader, EnrichedRowData 
producedRowData) {
-            this.wrappedReader = wrappedReader;
-            this.producedRowData = producedRowData;
-        }
-
-        @Nullable
-        @Override
-        public RecordIterator<RowData> readBatch() throws IOException {
-            RecordIterator<RowData> iterator = wrappedReader.readBatch();
-            if (iterator == null) {
-                return null;
-            }
-            return new RecordMapperWrapperRecordIterator<>(
-                    iterator,
-                    physicalRowData -> {
-                        producedRowData.replaceMutableRow(physicalRowData);
-                        return producedRowData;
-                    });
-        }
-
-        @Override
-        public void close() throws IOException {
-            this.wrappedReader.close();
-        }
+        return RecordMapperWrapperRecordIterator.wrapReader(
+                superReader,
+                physicalRowData -> {
+                    producedRowData.replaceMutableRow(physicalRowData);
+                    return producedRowData;
+                });
     }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
index d1b12a7..09ef76f 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
@@ -33,8 +33,10 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.connector.format.BulkDecodingFormat;
 import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.source.InputFormatProvider;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.SourceFunctionProvider;
@@ -120,38 +122,37 @@ public class FileSystemTableSource extends 
AbstractFileSystemTable
             return InputFormatProvider.of(new CollectionInputFormat<>(new 
ArrayList<>(), null));
         }
 
-        // Physical type is computed from the full data type, filtering out 
partition and
-        // metadata columns. This type is going to be used by formats to parse 
the input.
-        List<DataTypes.Field> producedDataTypeFields = 
DataType.getFields(producedDataType);
-        if (metadataKeys != null && !metadataKeys.isEmpty()) {
-            // If metadata keys are present, then by SupportsReadingMetadata 
contract all the
-            // metadata columns will be at the end of the producedDataType, so 
we can just remove
-            // from the list the last metadataKeys.size() fields.
-            producedDataTypeFields =
-                    producedDataTypeFields.subList(
-                            0, producedDataTypeFields.size() - 
metadataKeys.size());
-        }
-        DataType physicalDataType =
-                producedDataTypeFields.stream()
-                        .filter(f -> partitionKeys == null || 
!partitionKeys.contains(f.getName()))
-                        
.collect(Collectors.collectingAndThen(Collectors.toList(), DataTypes::ROW));
-
         // Resolve metadata and make sure to filter out metadata not in the 
producedDataType
-        List<String> metadataKeys =
-                (this.metadataKeys == null) ? Collections.emptyList() : 
this.metadataKeys;
-        metadataKeys =
+        final List<String> metadataKeys =
                 DataType.getFieldNames(producedDataType).stream()
-                        .filter(metadataKeys::contains)
+                        .filter(
+                                ((this.metadataKeys == null)
+                                                ? Collections.emptyList()
+                                                : this.metadataKeys)
+                                        ::contains)
                         .collect(Collectors.toList());
-        List<ReadableFileInfo> metadataToExtract =
+        final List<ReadableFileInfo> metadataToExtract =
                 
metadataKeys.stream().map(ReadableFileInfo::resolve).collect(Collectors.toList());
 
         // Filter out partition columns not in producedDataType
-        List<String> partitionKeysToExtract =
+        final List<String> partitionKeysToExtract =
                 DataType.getFieldNames(producedDataType).stream()
                         .filter(this.partitionKeys::contains)
                         .collect(Collectors.toList());
 
+        // Compute the physical projection and the physical data type, that is
+        // the type without partition columns and metadata in the same order 
of the schema
+        DataType physicalDataType = this.schema.toPhysicalRowDataType();
+        final Projection partitionKeysProjections =
+                Projection.fromFieldNames(physicalDataType, 
partitionKeysToExtract);
+        final Projection physicalProjections =
+                (projectFields != null
+                                ? Projection.of(projectFields)
+                                : Projection.all(physicalDataType))
+                        .difference(partitionKeysProjections);
+        physicalDataType =
+                
partitionKeysProjections.complement(physicalDataType).project(physicalDataType);
+
         // TODO FLINK-19845 old format factory, to be removed soon. The old 
factory doesn't support
         //  metadata.
         if (formatFactory != null) {
@@ -176,23 +177,56 @@ public class FileSystemTableSource extends 
AbstractFileSystemTable
                     && filters.size() > 0) {
                 ((BulkDecodingFormat<RowData>) 
bulkReaderFormat).applyFilters(filters);
             }
-            BulkFormat<RowData, FileSourceSplit> bulkFormat =
+
+            BulkFormat<RowData, FileSourceSplit> format;
+            if (bulkReaderFormat instanceof ProjectableDecodingFormat) {
+                format =
+                        ((ProjectableDecodingFormat<BulkFormat<RowData, 
FileSourceSplit>>)
+                                        bulkReaderFormat)
+                                .createRuntimeDecoder(
+                                        scanContext,
+                                        physicalDataType,
+                                        physicalProjections.toNestedIndexes());
+            } else {
+                format =
+                        new ProjectingBulkFormat(
+                                bulkReaderFormat.createRuntimeDecoder(
+                                        scanContext, physicalDataType),
+                                physicalProjections.toTopLevelIndexes(),
+                                scanContext.createTypeInformation(
+                                        
physicalProjections.project(physicalDataType)));
+            }
+
+            format =
                     wrapBulkFormat(
-                            bulkReaderFormat.createRuntimeDecoder(scanContext, 
physicalDataType),
-                            producedDataType,
-                            metadataToExtract,
-                            partitionKeysToExtract);
-            return createSourceProvider(bulkFormat);
+                            format, producedDataType, metadataToExtract, 
partitionKeysToExtract);
+            return createSourceProvider(format);
         } else if (deserializationFormat != null) {
-            DeserializationSchema<RowData> decoder =
-                    deserializationFormat.createRuntimeDecoder(scanContext, 
physicalDataType);
-            BulkFormat<RowData, FileSourceSplit> bulkFormat =
+            BulkFormat<RowData, FileSourceSplit> format;
+            if (deserializationFormat instanceof ProjectableDecodingFormat) {
+                format =
+                        new DeserializationSchemaAdapter(
+                                
((ProjectableDecodingFormat<DeserializationSchema<RowData>>)
+                                                deserializationFormat)
+                                        .createRuntimeDecoder(
+                                                scanContext,
+                                                physicalDataType,
+                                                
physicalProjections.toNestedIndexes()));
+            } else {
+                format =
+                        new ProjectingBulkFormat(
+                                new DeserializationSchemaAdapter(
+                                        
deserializationFormat.createRuntimeDecoder(
+                                                scanContext, 
physicalDataType)),
+                                physicalProjections.toTopLevelIndexes(),
+                                scanContext.createTypeInformation(
+                                        
physicalProjections.project(physicalDataType)));
+            }
+
+            format =
                     wrapBulkFormat(
-                            new DeserializationSchemaAdapter(decoder),
-                            producedDataType,
-                            metadataToExtract,
-                            partitionKeysToExtract);
-            return createSourceProvider(bulkFormat);
+                            format, producedDataType, metadataToExtract, 
partitionKeysToExtract);
+            return createSourceProvider(format);
         } else {
             throw new TableException("Can not find format factory.");
         }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/ProjectingBulkFormat.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/ProjectingBulkFormat.java
new file mode 100644
index 0000000..b60dc46
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/ProjectingBulkFormat.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import 
org.apache.flink.connector.file.src.util.RecordMapperWrapperRecordIterator;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+
+import java.io.IOException;
+
+/**
+ * This {@link BulkFormat} is a wrapper that performs projections for formats 
that don't support
+ * projections.
+ */
+class ProjectingBulkFormat implements BulkFormat<RowData, FileSourceSplit> {
+
+    private final BulkFormat<RowData, FileSourceSplit> wrapped;
+    private final TypeInformation<RowData> producedType;
+
+    private final int[] projections;
+
+    public ProjectingBulkFormat(
+            BulkFormat<RowData, FileSourceSplit> wrapped,
+            int[] projections,
+            TypeInformation<RowData> producedType) {
+        this.wrapped = wrapped;
+        this.projections = projections;
+        this.producedType = producedType;
+    }
+
+    @Override
+    public Reader<RowData> createReader(Configuration config, FileSourceSplit 
split)
+            throws IOException {
+        return wrapReader(wrapped.createReader(config, split), split);
+    }
+
+    @Override
+    public Reader<RowData> restoreReader(Configuration config, FileSourceSplit 
split)
+            throws IOException {
+        return wrapReader(wrapped.restoreReader(config, split), split);
+    }
+
+    @Override
+    public boolean isSplittable() {
+        return wrapped.isSplittable();
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return producedType;
+    }
+
+    private Reader<RowData> wrapReader(Reader<RowData> superReader, 
FileSourceSplit split) {
+        // This row is going to be reused for every record
+        final ProjectedRowData producedRowData = 
ProjectedRowData.from(this.projections);
+
+        return RecordMapperWrapperRecordIterator.wrapReader(
+                superReader,
+                physicalRowData -> {
+                    producedRowData.replaceRow(physicalRowData);
+                    return producedRowData;
+                });
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
index 03a80e7..0235cd6 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
@@ -43,9 +44,7 @@ import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import static 
org.apache.flink.api.java.io.CsvOutputFormat.DEFAULT_FIELD_DELIMITER;
 import static 
org.apache.flink.api.java.io.CsvOutputFormat.DEFAULT_LINE_DELIMITER;
@@ -120,24 +119,16 @@ public class TestCsvFileSystemFormatFactory
     @Override
     public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
             DynamicTableFactory.Context context, ReadableConfig formatOptions) 
{
-        List<String> schemaFields =
-                
DataType.getFieldNames(context.getPhysicalRowDataType()).stream()
-                        .filter(
-                                field ->
-                                        !context.getCatalogTable()
-                                                .getPartitionKeys()
-                                                .contains(field))
-                        .collect(Collectors.toList());
-        return new DecodingFormat<DeserializationSchema<RowData>>() {
+        return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() 
{
             @Override
             public DeserializationSchema<RowData> createRuntimeDecoder(
-                    DynamicTableSource.Context context, DataType 
physicalDataType) {
-                // TestCsvDeserializationSchema has no knowledge of the field 
names, and the
-                // implicit assumption done by tests is that the csv rows are 
composed by only the
-                // physical fields (excluding partition fields) in the same 
order as defined in the
-                // table declaration. This is why TestCsvDeserializationSchema 
needs
-                // schemaFields.
-                return new TestCsvDeserializationSchema(physicalDataType, 
schemaFields);
+                    DynamicTableSource.Context context,
+                    DataType physicalDataType,
+                    int[][] projections) {
+                DataType projectedPhysicalDataType =
+                        DataType.projectFields(physicalDataType, projections);
+                return new TestCsvDeserializationSchema(
+                        projectedPhysicalDataType, 
DataType.getFieldNames(physicalDataType));
             }
 
             @Override

Reply via email to