tsreaper commented on a change in pull request #17520: URL: https://github.com/apache/flink/pull/17520#discussion_r748035668
########## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java ########## @@ -20,39 +20,84 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.file.src.FileSourceSplit; +import org.apache.flink.connector.file.src.reader.BulkFormat; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.BulkDecodingFormat; import org.apache.flink.table.connector.format.EncodingFormat; import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.BulkReaderFormatFactory; import org.apache.flink.table.factories.BulkWriterFormatFactory; import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.filesystem.FileSystemConnectorOptions; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.utils.PartitionPathUtils; import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; import java.io.IOException; import java.io.OutputStream; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Set; import static org.apache.flink.formats.avro.AvroFormatOptions.AVRO_OUTPUT_CODEC; /** Avro format factory for file system. */ @Internal -public class AvroFileFormatFactory implements BulkWriterFormatFactory { +public class AvroFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory { public static final String IDENTIFIER = "avro"; + @Override + public BulkDecodingFormat<RowData> createDecodingFormat( + DynamicTableFactory.Context context, ReadableConfig formatOptions) { + return new BulkDecodingFormat<RowData>() { + @Override + public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder( + DynamicTableSource.Context sourceContext, DataType producedDataType) { + DataType physicalDataType = + context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType(); Review comment: Love this change. It simplifies code a lot. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org