stevenzwu commented on code in PR #6584: URL: https://github.com/apache/iceberg/pull/6584#discussion_r1073036477
########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import org.apache.avro.generic.GenericRecord; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.flink.source.AvroGenericRecordFileScanTaskReader; +import org.apache.iceberg.flink.source.DataIterator; +import org.apache.iceberg.flink.source.RowDataFileScanTaskReader; +import org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** Read Iceberg rows as {@link GenericRecord}. */ +public class AvroGenericRecordReaderFunction extends DataIteratorReaderFunction<GenericRecord> { + private final String tableName; + private final Schema readSchema; + private final FileIO io; + private final EncryptionManager encryption; + private final RowDataFileScanTaskReader rowDataReader; + + private transient RowDataToAvroGenericRecordConverter converter; + + /** + * Create a reader function without projection and name mapping. Column name is case-insensitive. + */ + public static AvroGenericRecordReaderFunction fromTable(Table table) { + return new AvroGenericRecordReaderFunction( + table.name(), + new Configuration(), + table.schema(), + null, + null, + false, Review Comment: > caseSensitive is used a little more frequently. I am not sure about that. I would think projection might be common too. name mapping might also be useful if the Avro schema is not identical (e.g. same fields and order), although we don't support name mapping right now for Avro records. > Could we provide an enum for the user to choose from and then help build it automatically in the IcebergSourceBuilder instead of letting the user create it themselves? I agree with that point. By default, users shouldn't need to construct the `RowDataReaderFunction` as `IcebergSource` builds it internally if not set. We provide the `ReaderFunction` to `IcebergSource#Builder` to support more customized reader like this. As for `BaseMetadataTable`, users probably only indirectly use via Flink SQL for inspecting metadata tables, right? I doubt users would ever need to construct it programmatically. ``` if (readerFunction == null) { if (table instanceof BaseMetadataTable) { MetaDataReaderFunction rowDataReaderFunction = new MetaDataReaderFunction( flinkConfig, table.schema(), context.project(), table.io(), table.encryption()); this.readerFunction = (ReaderFunction<T>) rowDataReaderFunction; } else { RowDataReaderFunction rowDataReaderFunction = new RowDataReaderFunction( flinkConfig, table.schema(), context.project(), context.nameMapping(), context.caseSensitive(), table.io(), table.encryption()); this.readerFunction = (ReaderFunction<T>) rowDataReaderFunction; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
