This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.12.2-shadow in repository https://gitbox.apache.org/repos/asf/hudi.git
commit f9f9541e2e405bae9db2743c13f6a1b1f187c066 Author: sivabalan <n.siv...@gmail.com> AuthorDate: Tue Dec 13 16:45:56 2022 -0800 Revert "[HUDI-5306] Unify RecordIterator and HoodieParquetReader with ClosableIterator (#7340)" This reverts commit 1b8e5ec12e300ed76c74fdddb585ede55c66b28a. --- .../apache/hudi/configuration/OptionsResolver.java | 18 -- .../java/org/apache/hudi/table/format/CastMap.java | 223 ---------------- .../org/apache/hudi/table/format/FormatUtils.java | 28 -- .../hudi/table/format/InternalSchemaManager.java | 170 ------------ .../table/format/ParquetSplitRecordIterator.java | 61 ----- .../apache/hudi/table/format/RecordIterators.java | 91 ------- .../table/format/SchemaEvolvedRecordIterator.java | 52 ---- .../table/format/cow/CopyOnWriteInputFormat.java | 27 +- .../table/format/mor/MergeOnReadInputFormat.java | 290 ++++++++++----------- .../apache/hudi/util/RowDataCastProjection.java | 49 ---- .../org/apache/hudi/util/RowDataProjection.java | 9 +- 11 files changed, 151 insertions(+), 867 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java index 029ac3c4027..0dd31ee7538 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java @@ -202,24 +202,6 @@ public class OptionsResolver { return !conf.contains(FlinkOptions.READ_START_COMMIT) && !conf.contains(FlinkOptions.READ_END_COMMIT); } -<<<<<<< HEAD -======= - /** - * Returns the supplemental logging mode. - */ - public static HoodieCDCSupplementalLoggingMode getCDCSupplementalLoggingMode(Configuration conf) { - String mode = conf.getString(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE); - return HoodieCDCSupplementalLoggingMode.parse(mode); - } - - /** - * Returns whether comprehensive schema evolution enabled. - */ - public static boolean isSchemaEvolutionEnabled(Configuration conf) { - return conf.getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue()); - } - ->>>>>>> 07cc3e89a7 ([HUDI-5306] Unify RecordIterator and HoodieParquetReader with ClosableIterator (#7340)) // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java deleted file mode 100644 index 36cf8708875..00000000000 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.format; - -import org.apache.hudi.common.util.Option; -import org.apache.hudi.util.RowDataCastProjection; -import org.apache.hudi.util.RowDataProjection; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.table.data.DecimalData; -import org.apache.flink.table.data.binary.BinaryStringData; -import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.DecimalType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.LogicalTypeRoot; - -import javax.annotation.Nullable; -import java.io.Serializable; -import java.math.BigDecimal; -import java.time.LocalDate; -import java.util.HashMap; -import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collectors; - -import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT; -import static org.apache.flink.table.types.logical.LogicalTypeRoot.DATE; -import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL; -import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE; -import static org.apache.flink.table.types.logical.LogicalTypeRoot.FLOAT; -import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER; -import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR; - -/** - * CastMap is responsible for conversion of flink types when full schema evolution enabled. - * - * <p>Supported cast conversions: - * <ul> - * <li>Integer => Long, Float, Double, Decimal, String</li> - * <li>Long => Float, Double, Decimal, String</li> - * <li>Float => Double, Decimal, String</li> - * <li>Double => Decimal, String</li> - * <li>Decimal => Decimal, String</li> - * <li>String => Decimal, Date</li> - * <li>Date => String</li> - * </ul> - */ -public final class CastMap implements Serializable { - - private static final long serialVersionUID = 1L; - - // Maps position to corresponding cast - private final Map<Integer, Cast> castMap = new HashMap<>(); - - private DataType[] fileFieldTypes; - - public Option<RowDataProjection> toRowDataProjection(int[] selectedFields) { - if (castMap.isEmpty()) { - return Option.empty(); - } - LogicalType[] requiredType = new LogicalType[selectedFields.length]; - for (int i = 0; i < selectedFields.length; i++) { - requiredType[i] = fileFieldTypes[selectedFields[i]].getLogicalType(); - } - return Option.of(new RowDataCastProjection(requiredType, this)); - } - - public Object castIfNeeded(int pos, Object val) { - Cast cast = castMap.get(pos); - if (cast == null) { - return val; - } - return cast.convert(val); - } - - public DataType[] getFileFieldTypes() { - return fileFieldTypes; - } - - public void setFileFieldTypes(DataType[] fileFieldTypes) { - this.fileFieldTypes = fileFieldTypes; - } - - @VisibleForTesting - void add(int pos, LogicalType fromType, LogicalType toType) { - Function<Object, Object> conversion = getConversion(fromType, toType); - if (conversion == null) { - throw new IllegalArgumentException(String.format("Cannot create cast %s => %s at pos %s", fromType, toType, pos)); - } - add(pos, new Cast(fromType, toType, conversion)); - } - - private @Nullable Function<Object, Object> getConversion(LogicalType fromType, LogicalType toType) { - LogicalTypeRoot from = fromType.getTypeRoot(); - LogicalTypeRoot to = toType.getTypeRoot(); - switch (to) { - case BIGINT: { - if (from == INTEGER) { - return val -> ((Number) val).longValue(); - } - break; - } - case FLOAT: { - if (from == INTEGER || from == BIGINT) { - return val -> ((Number) val).floatValue(); - } - break; - } - case DOUBLE: { - if (from == INTEGER || from == BIGINT) { - return val -> ((Number) val).doubleValue(); - } - if (from == FLOAT) { - return val -> Double.parseDouble(val.toString()); - } - break; - } - case DECIMAL: { - if (from == INTEGER || from == BIGINT || from == DOUBLE) { - return val -> toDecimalData((Number) val, toType); - } - if (from == FLOAT) { - return val -> toDecimalData(Double.parseDouble(val.toString()), toType); - } - if (from == VARCHAR) { - return val -> toDecimalData(Double.parseDouble(val.toString()), toType); - } - if (from == DECIMAL) { - return val -> toDecimalData(((DecimalData) val).toBigDecimal(), toType); - } - break; - } - case VARCHAR: { - if (from == INTEGER - || from == BIGINT - || from == FLOAT - || from == DOUBLE - || from == DECIMAL) { - return val -> new BinaryStringData(String.valueOf(val)); - } - if (from == DATE) { - return val -> new BinaryStringData(LocalDate.ofEpochDay(((Integer) val).longValue()).toString()); - } - break; - } - case DATE: { - if (from == VARCHAR) { - return val -> (int) LocalDate.parse(val.toString()).toEpochDay(); - } - break; - } - default: - } - return null; - } - - private void add(int pos, Cast cast) { - castMap.put(pos, cast); - } - - private DecimalData toDecimalData(Number val, LogicalType decimalType) { - BigDecimal valAsDecimal = BigDecimal.valueOf(val.doubleValue()); - return toDecimalData(valAsDecimal, decimalType); - } - - private DecimalData toDecimalData(BigDecimal valAsDecimal, LogicalType decimalType) { - return DecimalData.fromBigDecimal( - valAsDecimal, - ((DecimalType) decimalType).getPrecision(), - ((DecimalType) decimalType).getScale()); - } - - /** - * Fields {@link Cast#from} and {@link Cast#to} are redundant due to {@link Cast#convert(Object)} determines conversion. - * However, it is convenient to debug {@link CastMap} when {@link Cast#toString()} prints types. - */ - private static final class Cast implements Serializable { - - private static final long serialVersionUID = 1L; - - private final LogicalType from; - private final LogicalType to; - private final Function<Object, Object> conversion; - - Cast(LogicalType from, LogicalType to, Function<Object, Object> conversion) { - this.from = from; - this.to = to; - this.conversion = conversion; - } - - Object convert(Object val) { - return conversion.apply(val); - } - - @Override - public String toString() { - return from + " => " + to; - } - } - - @Override - public String toString() { - return castMap.entrySet().stream() - .map(e -> e.getKey() + ": " + e.getValue()) - .collect(Collectors.joining(", ", "{", "}")); - } -} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index 49cb3cec5bf..6357b898d49 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -32,7 +32,6 @@ import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; -import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; import org.apache.hudi.util.FlinkWriteClients; import org.apache.hudi.util.StreamerUtil; @@ -171,33 +170,6 @@ public class FormatUtils { .build(); } - public static HoodieMergedLogRecordScanner logScanner( - MergeOnReadInputSplit split, - Schema logSchema, - InternalSchema internalSchema, - org.apache.flink.configuration.Configuration flinkConf, - Configuration hadoopConf) { - HoodieWriteConfig writeConfig = FlinkWriteClients.getHoodieClientConfig(flinkConf); - FileSystem fs = FSUtils.getFs(split.getTablePath(), hadoopConf); - return HoodieMergedLogRecordScanner.newBuilder() - .withFileSystem(fs) - .withBasePath(split.getTablePath()) - .withLogFilePaths(split.getLogPaths().get()) - .withReaderSchema(logSchema) - .withInternalSchema(internalSchema) - .withLatestInstantTime(split.getLatestCommit()) - .withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled()) - .withReverseReader(false) - .withBufferSize(writeConfig.getMaxDFSStreamBufferSize()) - .withMaxMemorySizeInBytes(split.getMaxCompactionMemoryInBytes()) - .withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType()) - .withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) - .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath()) - .withInstantRange(split.getInstantRange()) - .withOperationField(flinkConf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) - .build(); - } - /** * Utility to read and buffer the records in the unMerged log record scanner. */ diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java deleted file mode 100644 index abd405469d8..00000000000 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.format; - -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.TableSchemaResolver; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.util.InternalSchemaCache; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.configuration.HadoopConfigurations; -import org.apache.hudi.configuration.OptionsResolver; -import org.apache.hudi.internal.schema.InternalSchema; -import org.apache.hudi.internal.schema.Type; -import org.apache.hudi.internal.schema.Types; -import org.apache.hudi.internal.schema.action.InternalSchemaMerger; -import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; -import org.apache.hudi.internal.schema.utils.InternalSchemaUtils; -import org.apache.hudi.util.AvroSchemaConverter; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.types.DataType; -import org.apache.flink.util.Preconditions; - -import java.io.Serializable; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -/** - * This class is responsible for calculating names and types of fields that are actual at a certain point in time. - * If field is renamed in queried schema, its old name will be returned, which is relevant at the provided time. - * If type of field is changed, its old type will be returned, and projection will be created that will convert the old type to the queried one. - */ -public class InternalSchemaManager implements Serializable { - - private static final long serialVersionUID = 1L; - - public static final InternalSchemaManager DISABLED = new InternalSchemaManager(null, InternalSchema.getEmptyInternalSchema(), null, null); - - private final Configuration conf; - private final InternalSchema querySchema; - private final String validCommits; - private final String tablePath; - private transient org.apache.hadoop.conf.Configuration hadoopConf; - - public static InternalSchemaManager get(Configuration conf, HoodieTableMetaClient metaClient) { - if (!OptionsResolver.isSchemaEvolutionEnabled(conf)) { - return DISABLED; - } - Option<InternalSchema> internalSchema = new TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata(); - if (!internalSchema.isPresent() || internalSchema.get().isEmptySchema()) { - return DISABLED; - } - String validCommits = metaClient - .getCommitsAndCompactionTimeline() - .filterCompletedInstants() - .getInstantsAsStream() - .map(HoodieInstant::getFileName) - .collect(Collectors.joining(",")); - return new InternalSchemaManager(conf, internalSchema.get(), validCommits, metaClient.getBasePathV2().toString()); - } - - public InternalSchemaManager(Configuration conf, InternalSchema querySchema, String validCommits, String tablePath) { - this.conf = conf; - this.querySchema = querySchema; - this.validCommits = validCommits; - this.tablePath = tablePath; - } - - public InternalSchema getQuerySchema() { - return querySchema; - } - - InternalSchema getFileSchema(String fileName) { - if (querySchema.isEmptySchema()) { - return querySchema; - } - long commitInstantTime = Long.parseLong(FSUtils.getCommitTime(fileName)); - InternalSchema fileSchemaUnmerged = InternalSchemaCache.getInternalSchemaByVersionId( - commitInstantTime, tablePath, getHadoopConf(), validCommits); - if (querySchema.equals(fileSchemaUnmerged)) { - return InternalSchema.getEmptyInternalSchema(); - } - return new InternalSchemaMerger(fileSchemaUnmerged, querySchema, true, true).mergeSchema(); - } - - CastMap getCastMap(InternalSchema fileSchema, String[] queryFieldNames, DataType[] queryFieldTypes, int[] selectedFields) { - Preconditions.checkArgument(!querySchema.isEmptySchema(), "querySchema cannot be empty"); - Preconditions.checkArgument(!fileSchema.isEmptySchema(), "fileSchema cannot be empty"); - - CastMap castMap = new CastMap(); - Map<Integer, Integer> posProxy = getPosProxy(fileSchema, queryFieldNames); - if (posProxy.isEmpty()) { - castMap.setFileFieldTypes(queryFieldTypes); - return castMap; - } - List<Integer> selectedFieldList = IntStream.of(selectedFields).boxed().collect(Collectors.toList()); - List<DataType> fileSchemaAsDataTypes = AvroSchemaConverter.convertToDataType( - AvroInternalSchemaConverter.convert(fileSchema, "tableName")).getChildren(); - DataType[] fileFieldTypes = new DataType[queryFieldTypes.length]; - for (int i = 0; i < queryFieldTypes.length; i++) { - Integer posOfChangedType = posProxy.get(i); - if (posOfChangedType == null) { - fileFieldTypes[i] = queryFieldTypes[i]; - } else { - DataType fileType = fileSchemaAsDataTypes.get(posOfChangedType); - fileFieldTypes[i] = fileType; - int selectedPos = selectedFieldList.indexOf(i); - if (selectedPos != -1) { - castMap.add(selectedPos, fileType.getLogicalType(), queryFieldTypes[i].getLogicalType()); - } - } - } - castMap.setFileFieldTypes(fileFieldTypes); - return castMap; - } - - String[] getFileFieldNames(InternalSchema fileSchema, String[] queryFieldNames) { - Preconditions.checkArgument(!querySchema.isEmptySchema(), "querySchema cannot be empty"); - Preconditions.checkArgument(!fileSchema.isEmptySchema(), "fileSchema cannot be empty"); - - Map<String, String> renamedCols = InternalSchemaUtils.collectRenameCols(fileSchema, querySchema); - if (renamedCols.isEmpty()) { - return queryFieldNames; - } - return Arrays.stream(queryFieldNames).map(name -> renamedCols.getOrDefault(name, name)).toArray(String[]::new); - } - - private Map<Integer, Integer> getPosProxy(InternalSchema fileSchema, String[] queryFieldNames) { - Map<Integer, Pair<Type, Type>> changedCols = InternalSchemaUtils.collectTypeChangedCols(querySchema, fileSchema); - HashMap<Integer, Integer> posProxy = new HashMap<>(changedCols.size()); - List<String> fieldNameList = Arrays.asList(queryFieldNames); - List<Types.Field> columns = querySchema.columns(); - changedCols.forEach((posInSchema, typePair) -> { - String name = columns.get(posInSchema).name(); - int posInType = fieldNameList.indexOf(name); - posProxy.put(posInType, posInSchema); - }); - return Collections.unmodifiableMap(posProxy); - } - - private org.apache.hadoop.conf.Configuration getHadoopConf() { - if (hadoopConf == null) { - hadoopConf = HadoopConfigurations.getHadoopConf(conf); - } - return hadoopConf; - } -} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/ParquetSplitRecordIterator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/ParquetSplitRecordIterator.java deleted file mode 100644 index 7b26d71f115..00000000000 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/ParquetSplitRecordIterator.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.format; - -import org.apache.hudi.common.util.ClosableIterator; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader; - -import org.apache.flink.table.data.RowData; - -import java.io.IOException; - -/** - * Hoodie wrapper for flink parquet reader. - */ -public final class ParquetSplitRecordIterator implements ClosableIterator<RowData> { - private final ParquetColumnarRowSplitReader reader; - - public ParquetSplitRecordIterator(ParquetColumnarRowSplitReader reader) { - this.reader = reader; - } - - @Override - public boolean hasNext() { - try { - return !reader.reachedEnd(); - } catch (IOException e) { - throw new HoodieIOException("Decides whether the parquet columnar row split reader reached end exception", e); - } - } - - @Override - public RowData next() { - return reader.nextRecord(); - } - - @Override - public void close() { - try { - reader.close(); - } catch (IOException e) { - throw new HoodieIOException("Close the parquet columnar row split reader exception", e); - } - } -} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java deleted file mode 100644 index 8657f16ddc9..00000000000 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.format; - -import org.apache.hudi.common.util.ClosableIterator; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.internal.schema.InternalSchema; -import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil; -import org.apache.hudi.util.RowDataProjection; - -import org.apache.flink.core.fs.Path; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.DataType; -import org.apache.hadoop.conf.Configuration; - -import java.io.IOException; -import java.util.Map; - -/** - * Factory clazz for record iterators. - */ -public abstract class RecordIterators { - - public static ClosableIterator<RowData> getParquetRecordIterator( - InternalSchemaManager internalSchemaManager, - boolean utcTimestamp, - boolean caseSensitive, - Configuration conf, - String[] fieldNames, - DataType[] fieldTypes, - Map<String, Object> partitionSpec, - int[] selectedFields, - int batchSize, - Path path, - long splitStart, - long splitLength) throws IOException { - InternalSchema fileSchema = internalSchemaManager.getFileSchema(path.getName()); - if (fileSchema.isEmptySchema()) { - return new ParquetSplitRecordIterator( - ParquetSplitReaderUtil.genPartColumnarRowReader( - utcTimestamp, - caseSensitive, - conf, - fieldNames, - fieldTypes, - partitionSpec, - selectedFields, - batchSize, - path, - splitStart, - splitLength)); - } else { - CastMap castMap = internalSchemaManager.getCastMap(fileSchema, fieldNames, fieldTypes, selectedFields); - Option<RowDataProjection> castProjection = castMap.toRowDataProjection(selectedFields); - ClosableIterator<RowData> itr = new ParquetSplitRecordIterator( - ParquetSplitReaderUtil.genPartColumnarRowReader( - utcTimestamp, - caseSensitive, - conf, - internalSchemaManager.getFileFieldNames(fileSchema, fieldNames), // the reconciled field names - castMap.getFileFieldTypes(), // the reconciled field types - partitionSpec, - selectedFields, - batchSize, - path, - splitStart, - splitLength)); - if (castProjection.isPresent()) { - return new SchemaEvolvedRecordIterator(itr, castProjection.get()); - } else { - return itr; - } - } - } -} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/SchemaEvolvedRecordIterator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/SchemaEvolvedRecordIterator.java deleted file mode 100644 index 739512c7b55..00000000000 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/SchemaEvolvedRecordIterator.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.format; - -import org.apache.hudi.common.util.ClosableIterator; -import org.apache.hudi.util.RowDataProjection; - -import org.apache.flink.table.data.RowData; - -/** - * Decorates origin record iterator with cast projection. - */ -public final class SchemaEvolvedRecordIterator implements ClosableIterator<RowData> { - private final ClosableIterator<RowData> nested; - private final RowDataProjection castProjection; - - public SchemaEvolvedRecordIterator(ClosableIterator<RowData> nested, RowDataProjection castProjection) { - this.nested = nested; - this.castProjection = castProjection; - } - - @Override - public boolean hasNext() { - return nested.hasNext(); - } - - @Override - public RowData next() { - return castProjection.project(nested.next()); - } - - @Override - public void close() { - nested.close(); - } -} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java index 820424549f0..c5ea3d4ab98 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java @@ -20,9 +20,7 @@ package org.apache.hudi.table.format.cow; import java.util.Comparator; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.util.ClosableIterator; -import org.apache.hudi.table.format.InternalSchemaManager; -import org.apache.hudi.table.format.RecordIterators; +import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader; import org.apache.hudi.util.DataTypeUtils; import org.apache.flink.api.common.io.FileInputFormat; @@ -76,7 +74,7 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> { private final SerializableConfiguration conf; private final long limit; - private transient ClosableIterator<RowData> itr; + private transient ParquetColumnarRowSplitReader reader; private transient long currentReadCount; /** @@ -84,8 +82,6 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> { */ private FilePathFilter localFilesFilter = new GlobFilePathFilter(); - private final InternalSchemaManager internalSchemaManager; - public CopyOnWriteInputFormat( Path[] paths, String[] fullFieldNames, @@ -94,8 +90,7 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> { String partDefaultName, long limit, Configuration conf, - boolean utcTimestamp, - InternalSchemaManager internalSchemaManager) { + boolean utcTimestamp) { super.setFilePaths(paths); this.limit = limit; this.partDefaultName = partDefaultName; @@ -104,7 +99,6 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> { this.selectedFields = selectedFields; this.conf = new SerializableConfiguration(conf); this.utcTimestamp = utcTimestamp; - this.internalSchemaManager = internalSchemaManager; } @Override @@ -129,8 +123,7 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> { } }); - this.itr = RecordIterators.getParquetRecordIterator( - internalSchemaManager, + this.reader = ParquetSplitReaderUtil.genPartColumnarRowReader( utcTimestamp, true, conf.conf(), @@ -277,26 +270,26 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> { } @Override - public boolean reachedEnd() { + public boolean reachedEnd() throws IOException { if (currentReadCount >= limit) { return true; } else { - return !itr.hasNext(); + return reader.reachedEnd(); } } @Override public RowData nextRecord(RowData reuse) { currentReadCount++; - return itr.next(); + return reader.nextRecord(); } @Override public void close() throws IOException { - if (itr != null) { - this.itr.close(); + if (reader != null) { + this.reader.close(); } - this.itr = null; + this.reader = null; } /** diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index e1e7025ff07..c9b6561bdef 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -29,13 +29,11 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.table.format.FilePathUtils; import org.apache.hudi.table.format.FormatUtils; -import org.apache.hudi.table.format.InternalSchemaManager; -import org.apache.hudi.table.format.RecordIterators; +import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil; +import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader; import org.apache.hudi.util.AvroToRowDataConverters; import org.apache.hudi.util.DataTypeUtils; import org.apache.hudi.util.RowDataProjection; @@ -68,7 +66,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Properties; import java.util.Set; -import java.util.function.Function; import java.util.stream.IntStream; import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS; @@ -95,7 +92,7 @@ public class MergeOnReadInputFormat /** * Uniform iterator view for the underneath records. */ - private transient ClosableIterator<RowData> iterator; + private transient RecordIterator iterator; // for project push down /** @@ -140,16 +137,13 @@ public class MergeOnReadInputFormat */ private boolean closed = true; - private final InternalSchemaManager internalSchemaManager; - private MergeOnReadInputFormat( Configuration conf, MergeOnReadTableState tableState, List<DataType> fieldTypes, String defaultPartName, long limit, - boolean emitDelete, - InternalSchemaManager internalSchemaManager) { + boolean emitDelete) { this.conf = conf; this.tableState = tableState; this.fieldNames = tableState.getRowType().getFieldNames(); @@ -160,7 +154,6 @@ public class MergeOnReadInputFormat this.requiredPos = tableState.getRequiredPositions(); this.limit = limit; this.emitDelete = emitDelete; - this.internalSchemaManager = internalSchemaManager; } /** @@ -175,35 +168,30 @@ public class MergeOnReadInputFormat this.currentReadCount = 0L; this.closed = false; this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf); - this.iterator = initIterator(split); - mayShiftInputSplit(split); - } - - protected ClosableIterator<RowData> initIterator(MergeOnReadInputSplit split) throws IOException { if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() > 0)) { if (split.getInstantRange() != null) { // base file only with commit time filtering - return new BaseFileOnlyFilteringIterator( + this.iterator = new BaseFileOnlyFilteringIterator( split.getInstantRange(), this.tableState.getRequiredRowType(), - getBaseFileIterator(split.getBasePath().get(), getRequiredPosWithCommitTime(this.requiredPos))); + getReader(split.getBasePath().get(), getRequiredPosWithCommitTime(this.requiredPos))); } else { // base file only - return getBaseFileIterator(split.getBasePath().get()); + this.iterator = new BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get())); } } else if (!split.getBasePath().isPresent()) { // log files only if (OptionsResolver.emitChangelog(conf)) { - return new LogFileOnlyIterator(getUnMergedLogFileIterator(split)); + this.iterator = new LogFileOnlyIterator(getUnMergedLogFileIterator(split)); } else { - return new LogFileOnlyIterator(getLogFileIterator(split)); + this.iterator = new LogFileOnlyIterator(getLogFileIterator(split)); } } else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) { - return new SkipMergeIterator( - getBaseFileIterator(split.getBasePath().get()), + this.iterator = new SkipMergeIterator( + getRequiredSchemaReader(split.getBasePath().get()), getLogFileIterator(split)); } else if (split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) { - return new MergeIterator( + this.iterator = new MergeIterator( conf, hadoopConf, split, @@ -211,11 +199,10 @@ public class MergeOnReadInputFormat this.tableState.getRequiredRowType(), new Schema.Parser().parse(this.tableState.getAvroSchema()), new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()), - internalSchemaManager.getQuerySchema(), this.requiredPos, this.emitDelete, this.tableState.getOperationPos(), - getBaseFileIteratorWithMetadata(split.getBasePath().get())); + getFullSchemaReader(split.getBasePath().get())); } else { throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR File Split for " + "file path: " + split.getBasePath() @@ -224,6 +211,7 @@ public class MergeOnReadInputFormat + "spark partition Index: " + split.getSplitNumber() + "merge type: " + split.getMergeType()); } + mayShiftInputSplit(split); } @Override @@ -254,14 +242,14 @@ public class MergeOnReadInputFormat return true; } else { // log file reaches end ? - return !this.iterator.hasNext(); + return this.iterator.reachedEnd(); } } @Override public RowData nextRecord(RowData o) { currentReadCount++; - return this.iterator.next(); + return this.iterator.nextRecord(); } @Override @@ -296,19 +284,15 @@ public class MergeOnReadInputFormat } } - protected ClosableIterator<RowData> getBaseFileIteratorWithMetadata(String path) { - try { - return getBaseFileIterator(path, IntStream.range(0, this.tableState.getRowType().getFieldCount()).toArray()); - } catch (IOException e) { - throw new HoodieException("Get reader error for path: " + path); - } + private ParquetColumnarRowSplitReader getFullSchemaReader(String path) throws IOException { + return getReader(path, IntStream.range(0, this.tableState.getRowType().getFieldCount()).toArray()); } - protected ClosableIterator<RowData> getBaseFileIterator(String path) throws IOException { - return getBaseFileIterator(path, this.requiredPos); + private ParquetColumnarRowSplitReader getRequiredSchemaReader(String path) throws IOException { + return getReader(path, this.requiredPos); } - private ClosableIterator<RowData> getBaseFileIterator(String path, int[] requiredPos) throws IOException { + private ParquetColumnarRowSplitReader getReader(String path, int[] requiredPos) throws IOException { // generate partition specs. LinkedHashMap<String, String> partSpec = FilePathUtils.extractPartitionKeyValues( new org.apache.hadoop.fs.Path(path).getParent(), @@ -330,8 +314,7 @@ public class MergeOnReadInputFormat } }); - return RecordIterators.getParquetRecordIterator( - internalSchemaManager, + return ParquetSplitReaderUtil.genPartColumnarRowReader( this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE), true, HadoopConfigurations.getParquetConf(this.conf, hadoopConf), @@ -477,12 +460,46 @@ public class MergeOnReadInputFormat // ------------------------------------------------------------------------- // Inner Class // ------------------------------------------------------------------------- + private interface RecordIterator { + boolean reachedEnd() throws IOException; + + RowData nextRecord(); + + void close() throws IOException; + } + + static class BaseFileOnlyIterator implements RecordIterator { + // base file reader + private final ParquetColumnarRowSplitReader reader; + + BaseFileOnlyIterator(ParquetColumnarRowSplitReader reader) { + this.reader = reader; + } + + @Override + public boolean reachedEnd() throws IOException { + return this.reader.reachedEnd(); + } + + @Override + public RowData nextRecord() { + return this.reader.nextRecord(); + } + + @Override + public void close() throws IOException { + if (this.reader != null) { + this.reader.close(); + } + } + } + /** - * Base record iterator with instant time filtering. + * Similar with {@link BaseFileOnlyIterator} but with instant time filtering. */ - static class BaseFileOnlyFilteringIterator implements ClosableIterator<RowData> { - // base file record iterator - private final ClosableIterator<RowData> nested; + static class BaseFileOnlyFilteringIterator implements RecordIterator { + // base file reader + private final ParquetColumnarRowSplitReader reader; private final InstantRange instantRange; private final RowDataProjection projection; @@ -491,44 +508,44 @@ public class MergeOnReadInputFormat BaseFileOnlyFilteringIterator( Option<InstantRange> instantRange, RowType requiredRowType, - ClosableIterator<RowData> nested) { - this.nested = nested; + ParquetColumnarRowSplitReader reader) { + this.reader = reader; this.instantRange = instantRange.orElse(null); int[] positions = IntStream.range(1, 1 + requiredRowType.getFieldCount()).toArray(); projection = RowDataProjection.instance(requiredRowType, positions); } @Override - public boolean hasNext() { - while (this.nested.hasNext()) { - currentRecord = this.nested.next(); + public boolean reachedEnd() throws IOException { + while (!this.reader.reachedEnd()) { + currentRecord = this.reader.nextRecord(); if (instantRange != null) { boolean isInRange = instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString()); if (isInRange) { - return true; + return false; } } else { - return true; + return false; } } - return false; + return true; } @Override - public RowData next() { + public RowData nextRecord() { // can promote: no need to project with null instant range return projection.project(currentRecord); } @Override - public void close() { - if (this.nested != null) { - this.nested.close(); + public void close() throws IOException { + if (this.reader != null) { + this.reader.close(); } } } - protected static class LogFileOnlyIterator implements ClosableIterator<RowData> { + static class LogFileOnlyIterator implements RecordIterator { // iterator for log files private final ClosableIterator<RowData> iterator; @@ -537,12 +554,12 @@ public class MergeOnReadInputFormat } @Override - public boolean hasNext() { - return this.iterator.hasNext(); + public boolean reachedEnd() { + return !this.iterator.hasNext(); } @Override - public RowData next() { + public RowData nextRecord() { return this.iterator.next(); } @@ -554,9 +571,9 @@ public class MergeOnReadInputFormat } } - static class SkipMergeIterator implements ClosableIterator<RowData> { - // base file record iterator - private final ClosableIterator<RowData> nested; + static class SkipMergeIterator implements RecordIterator { + // base file reader + private final ParquetColumnarRowSplitReader reader; // iterator for log files private final ClosableIterator<RowData> iterator; @@ -567,34 +584,34 @@ public class MergeOnReadInputFormat private RowData currentRecord; - SkipMergeIterator(ClosableIterator<RowData> nested, ClosableIterator<RowData> iterator) { - this.nested = nested; + SkipMergeIterator(ParquetColumnarRowSplitReader reader, ClosableIterator<RowData> iterator) { + this.reader = reader; this.iterator = iterator; } @Override - public boolean hasNext() { - if (!readLogs && this.nested.hasNext()) { - currentRecord = this.nested.next(); - return true; + public boolean reachedEnd() throws IOException { + if (!readLogs && !this.reader.reachedEnd()) { + currentRecord = this.reader.nextRecord(); + return false; } readLogs = true; if (this.iterator.hasNext()) { currentRecord = this.iterator.next(); - return true; + return false; } - return false; + return true; } @Override - public RowData next() { + public RowData nextRecord() { return currentRecord; } @Override - public void close() { - if (this.nested != null) { - this.nested.close(); + public void close() throws IOException { + if (this.reader != null) { + this.reader.close(); } if (this.iterator != null) { this.iterator.close(); @@ -602,22 +619,24 @@ public class MergeOnReadInputFormat } } - protected static class MergeIterator implements ClosableIterator<RowData> { - // base file record iterator - private final ClosableIterator<RowData> nested; + static class MergeIterator implements RecordIterator { + // base file reader + private final ParquetColumnarRowSplitReader reader; // log keys used for merging private final Iterator<String> logKeysIterator; // scanner private final HoodieMergedLogRecordScanner scanner; private final Schema tableSchema; + private final Schema requiredSchema; + private final int[] requiredPos; private final boolean emitDelete; private final int operationPos; private final RowDataToAvroConverters.RowDataToAvroConverter rowDataToAvroConverter; private final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter; + private final GenericRecordBuilder recordBuilder; - private final Option<RowDataProjection> projection; - private final Option<Function<IndexedRecord, GenericRecord>> avroProjection; + private final RowDataProjection projection; private final InstantRange instantRange; @@ -640,49 +659,30 @@ public class MergeOnReadInputFormat RowType requiredRowType, Schema tableSchema, Schema requiredSchema, - InternalSchema querySchema, int[] requiredPos, boolean emitDelete, int operationPos, - ClosableIterator<RowData> nested) { // the iterator should be with full schema - this(flinkConf, hadoopConf, split, tableRowType, requiredRowType, tableSchema, - querySchema, - Option.of(RowDataProjection.instance(requiredRowType, requiredPos)), - Option.of(record -> buildAvroRecordBySchema(record, requiredSchema, requiredPos, new GenericRecordBuilder(requiredSchema))), - emitDelete, operationPos, nested); - } - - public MergeIterator( - Configuration flinkConf, - org.apache.hadoop.conf.Configuration hadoopConf, - MergeOnReadInputSplit split, - RowType tableRowType, - RowType requiredRowType, - Schema tableSchema, - InternalSchema querySchema, - Option<RowDataProjection> projection, - Option<Function<IndexedRecord, GenericRecord>> avroProjection, - boolean emitDelete, - int operationPos, - ClosableIterator<RowData> nested) { // the iterator should be with full schema + ParquetColumnarRowSplitReader reader) { // the reader should be with full schema this.tableSchema = tableSchema; - this.nested = nested; - this.scanner = FormatUtils.logScanner(split, tableSchema, querySchema, flinkConf, hadoopConf); + this.reader = reader; + this.scanner = FormatUtils.logScanner(split, tableSchema, flinkConf, hadoopConf); this.payloadProps = StreamerUtil.getPayloadConfig(flinkConf).getProps(); this.logKeysIterator = scanner.getRecords().keySet().iterator(); + this.requiredSchema = requiredSchema; + this.requiredPos = requiredPos; this.emitDelete = emitDelete; this.operationPos = operationPos; - this.avroProjection = avroProjection; + this.recordBuilder = new GenericRecordBuilder(requiredSchema); this.rowDataToAvroConverter = RowDataToAvroConverters.createConverter(tableRowType); this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType); - this.projection = projection; + this.projection = RowDataProjection.instance(requiredRowType, requiredPos); this.instantRange = split.getInstantRange().orElse(null); } @Override - public boolean hasNext() { - while (!readLogs && this.nested.hasNext()) { - currentRecord = this.nested.next(); + public boolean reachedEnd() throws IOException { + while (!readLogs && !this.reader.reachedEnd()) { + currentRecord = this.reader.nextRecord(); if (instantRange != null) { boolean isInRange = instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString()); if (!isInRange) { @@ -703,19 +703,19 @@ public class MergeOnReadInputFormat // deleted continue; } - IndexedRecord avroRecord = avroProjection.isPresent() - ? avroProjection.get().apply(mergedAvroRecord.get()) - : mergedAvroRecord.get(); + GenericRecord avroRecord = buildAvroRecordBySchema( + mergedAvroRecord.get(), + requiredSchema, + requiredPos, + recordBuilder); this.currentRecord = (RowData) avroToRowDataConverter.convert(avroRecord); this.currentRecord.setRowKind(rowKind); - return true; + return false; } } // project the full record in base with required positions - if (projection.isPresent()) { - currentRecord = projection.get().project(currentRecord); - } - return true; + currentRecord = projection.project(currentRecord); + return false; } // read the logs readLogs = true; @@ -725,53 +725,49 @@ public class MergeOnReadInputFormat Option<IndexedRecord> insertAvroRecord = getInsertValue(curKey); if (insertAvroRecord.isPresent()) { // the record is a DELETE if insertAvroRecord not present, skipping - IndexedRecord avroRecord = avroProjection.isPresent() - ? avroProjection.get().apply(insertAvroRecord.get()) - : insertAvroRecord.get(); + GenericRecord avroRecord = buildAvroRecordBySchema( + insertAvroRecord.get(), + requiredSchema, + requiredPos, + recordBuilder); this.currentRecord = (RowData) avroToRowDataConverter.convert(avroRecord); FormatUtils.setRowKind(this.currentRecord, insertAvroRecord.get(), this.operationPos); - return true; + return false; } } } - return false; + return true; } - private Option<IndexedRecord> getInsertValue(String curKey) { + private Option<IndexedRecord> getInsertValue(String curKey) throws IOException { final HoodieAvroRecord<?> record = (HoodieAvroRecord) scanner.getRecords().get(curKey); if (!emitDelete && HoodieOperation.isDelete(record.getOperation())) { return Option.empty(); } - try { - return record.getData().getInsertValue(tableSchema); - } catch (IOException e) { - throw new HoodieIOException("Get insert value from payload exception", e); - } + return record.getData().getInsertValue(tableSchema); } @Override - public RowData next() { + public RowData nextRecord() { return currentRecord; } @Override - public void close() { - if (this.nested != null) { - this.nested.close(); + public void close() throws IOException { + if (this.reader != null) { + this.reader.close(); } if (this.scanner != null) { this.scanner.close(); } } - private Option<IndexedRecord> mergeRowWithLog(RowData curRow, String curKey) { + private Option<IndexedRecord> mergeRowWithLog( + RowData curRow, + String curKey) throws IOException { final HoodieAvroRecord<?> record = (HoodieAvroRecord) scanner.getRecords().get(curKey); GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow); - try { - return record.getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema, payloadProps); - } catch (IOException e) { - throw new HoodieIOException("Merge base and delta payloads exception", e); - } + return record.getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema, payloadProps); } } @@ -779,13 +775,12 @@ public class MergeOnReadInputFormat * Builder for {@link MergeOnReadInputFormat}. */ public static class Builder { - protected Configuration conf; - protected MergeOnReadTableState tableState; - protected List<DataType> fieldTypes; - protected String defaultPartName; - protected long limit = -1; - protected boolean emitDelete = false; - protected InternalSchemaManager internalSchemaManager = InternalSchemaManager.DISABLED; + private Configuration conf; + private MergeOnReadTableState tableState; + private List<DataType> fieldTypes; + private String defaultPartName; + private long limit = -1; + private boolean emitDelete = false; public Builder config(Configuration conf) { this.conf = conf; @@ -817,14 +812,9 @@ public class MergeOnReadInputFormat return this; } - public Builder internalSchemaManager(InternalSchemaManager internalSchemaManager) { - this.internalSchemaManager = internalSchemaManager; - return this; - } - public MergeOnReadInputFormat build() { return new MergeOnReadInputFormat(conf, tableState, fieldTypes, - defaultPartName, limit, emitDelete, internalSchemaManager); + defaultPartName, limit, emitDelete); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataCastProjection.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataCastProjection.java deleted file mode 100644 index 55e85aa1f60..00000000000 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataCastProjection.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.util; - -import org.apache.hudi.table.format.CastMap; - -import org.apache.flink.table.types.logical.LogicalType; - -import javax.annotation.Nullable; -import java.util.stream.IntStream; - -/** - * This class is responsible to project row as well as {@link RowDataProjection}. - * In addition, fields are converted according to the CastMap. - */ -public final class RowDataCastProjection extends RowDataProjection { - private static final long serialVersionUID = 1L; - - private final CastMap castMap; - - public RowDataCastProjection(LogicalType[] types, CastMap castMap) { - super(types, IntStream.range(0, types.length).toArray()); - this.castMap = castMap; - } - - @Override - protected @Nullable Object getVal(int pos, @Nullable Object val) { - if (val == null) { - return null; - } - return castMap.castIfNeeded(pos, val); - } -} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java index 2e3e8a2ed32..8076d982b99 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java @@ -25,8 +25,6 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; -import javax.annotation.Nullable; - import java.io.Serializable; import java.util.Arrays; import java.util.List; @@ -39,7 +37,7 @@ public class RowDataProjection implements Serializable { private final RowData.FieldGetter[] fieldGetters; - protected RowDataProjection(LogicalType[] types, int[] positions) { + private RowDataProjection(LogicalType[] types, int[] positions) { ValidationUtils.checkArgument(types.length == positions.length, "types and positions should have the equal number"); this.fieldGetters = new RowData.FieldGetter[types.length]; @@ -88,9 +86,4 @@ public class RowDataProjection implements Serializable { } return values; } - - protected @Nullable - Object getVal(int pos, @Nullable Object val) { - return val; - } }