slinkydeveloper commented on a change in pull request #17520:
URL: https://github.com/apache/flink/pull/17520#discussion_r745773693



##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java
##########
@@ -20,39 +20,84 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.BulkDecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.BulkReaderFormatFactory;
 import org.apache.flink.table.factories.BulkWriterFormatFactory;
 import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.filesystem.FileSystemConnectorOptions;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.PartitionPathUtils;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import static 
org.apache.flink.formats.avro.AvroFormatOptions.AVRO_OUTPUT_CODEC;
 
 /** Avro format factory for file system. */
 @Internal
-public class AvroFileFormatFactory implements BulkWriterFormatFactory {
+public class AvroFileFormatFactory implements BulkReaderFormatFactory, 
BulkWriterFormatFactory {
 
     public static final String IDENTIFIER = "avro";
 
+    @Override
+    public BulkDecodingFormat<RowData> createDecodingFormat(
+            DynamicTableFactory.Context context, ReadableConfig formatOptions) 
{
+        return new BulkDecodingFormat<RowData>() {
+            @Override
+            public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
+                    DynamicTableSource.Context sourceContext, DataType 
producedDataType) {
+                DataType physicalDataType =
+                        
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();

Review comment:
       Can you please rebase on master? I introduced a change to relieve format 
implementations to take care of partition keys 
https://github.com/apache/flink/commit/1ea210278f1da41f193a7520306cab20596fd10f.
 You can clean up all the code to support partition keys, and you can also 
revert the moving of `FileSystemConnectorOptions`.

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java
##########
@@ -90,6 +135,112 @@ public String factoryIdentifier() {
         return options;
     }
 
+    private static class AvroGenericRecordBulkFormat
+            extends AbstractAvroBulkFormat<GenericRecord, RowData, 
FileSourceSplit> {
+
+        private static final long serialVersionUID = 1L;
+
+        // all physical fields in the source schema
+        private final DataType physicalDataType;
+        // projected physical fields
+        private final DataType producedDataType;
+        TypeInformation<RowData> producedTypeInfo;
+        private final List<String> partitionKeys;
+        private final String defaultPartitionValue;
+
+        private transient AvroToRowDataConverters.AvroToRowDataConverter 
converter;
+        private transient GenericRecord reusedAvroRecord;
+        private transient GenericRowData reusedRowData;
+        // we should fill i-th field of reusedRowData with
+        // readerRowTypeIndex[i]-th field of reusedAvroRecord
+        private transient int[] readerRowTypeIndex;
+
+        public AvroGenericRecordBulkFormat(
+                DynamicTableSource.Context context,
+                DataType physicalDataType,
+                DataType producedDataType,
+                List<String> partitionKeys,
+                String defaultPartitionValue) {
+            // partition keys are stored in file paths, not in avro file 
contents
+            super(
+                    AvroSchemaConverter.convertToSchema(
+                            getNotNullRowTypeWithExclusion(producedDataType, 
partitionKeys)));
+            this.physicalDataType = physicalDataType;
+            this.producedDataType = producedDataType;
+            this.producedTypeInfo = 
context.createTypeInformation(producedDataType);
+            this.partitionKeys = partitionKeys;
+            this.defaultPartitionValue = defaultPartitionValue;
+        }
+
+        @Override
+        protected void open(FileSourceSplit split) {
+            RowType readerRowType = 
getNotNullRowTypeWithExclusion(producedDataType, partitionKeys);
+
+            converter = 
AvroToRowDataConverters.createRowConverter(readerRowType);
+            reusedAvroRecord = new GenericData.Record(readerSchema);
+
+            List<String> physicalFieldNames = 
DataType.getFieldNames(physicalDataType);
+            int[] selectFieldIndices =
+                    DataType.getFieldNames(producedDataType).stream()
+                            .mapToInt(physicalFieldNames::indexOf)
+                            .toArray();

Review comment:
       Same as above, no need to project fields etc. Just use the 
`producedDataType` provided by you in `createRuntimeDecoder`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to