This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1bf6b44fcd1f feat(flink): add lance format for Flink append only table
(#18741)
1bf6b44fcd1f is described below
commit 1bf6b44fcd1f0b682d2c246b620a0413f2ef432a
Author: Danny Chan <[email protected]>
AuthorDate: Wed May 27 13:04:29 2026 +0800
feat(flink): add lance format for Flink append only table (#18741)
---
.../row/HoodieBloomFilterRowDataWriteSupport.java | 22 +-
.../io/storage/row/HoodieFlinkLanceArrowUtils.java | 305 +++++++++++++++++++++
.../io/storage/row/HoodieRowDataCreateHandle.java | 2 +-
.../row/HoodieRowDataFileWriterFactory.java | 55 ++++
.../io/storage/row/HoodieRowDataLanceWriter.java | 167 +++++++++++
.../row/HoodieRowDataParquetWriteSupport.java | 12 -
.../row/TestHoodieFlinkLanceArrowUtils.java | 95 +++++++
.../org/apache/hudi/table/HoodieTableFactory.java | 46 +++-
.../table/format/FlinkRowDataReaderContext.java | 21 +-
.../format/HoodieRowDataFileReaderFactory.java | 6 +
.../table/format/HoodieRowDataLanceReader.java | 301 ++++++++++++++++++++
.../table/format/cow/CopyOnWriteInputFormat.java | 76 +++--
.../java/org/apache/hudi/util/StreamerUtil.java | 19 ++
.../apache/hudi/table/ITTestHoodieDataSource.java | 36 +--
.../table/TestHoodieFileGroupReaderOnFlink.java | 8 +
.../apache/hudi/table/TestHoodieTableFactory.java | 62 ++++-
.../hudi/table/catalog/TestHoodieCatalog.java | 30 ++
.../format/TestFlinkRowDataReaderContext.java | 10 -
18 files changed, 1164 insertions(+), 109 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReaderFactory.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieBloomFilterRowDataWriteSupport.java
similarity index 59%
copy from
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReaderFactory.java
copy to
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieBloomFilterRowDataWriteSupport.java
index 2e7401d4caa0..36d9e28be3e2 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReaderFactory.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieBloomFilterRowDataWriteSupport.java
@@ -16,23 +16,23 @@
* limitations under the License.
*/
-package org.apache.hudi.table.format;
+package org.apache.hudi.io.storage.row;
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
-import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.util.StringUtils;
/**
- * Factory methods to create RowData file reader.
+ * Bloom-filter footer support for Flink RowData base-file writers.
*/
-public class HoodieRowDataFileReaderFactory extends HoodieFileReaderFactory {
- public HoodieRowDataFileReaderFactory(HoodieStorage storage) {
- super(storage);
+class HoodieBloomFilterRowDataWriteSupport extends
HoodieBloomFilterWriteSupport<String> {
+
+ HoodieBloomFilterRowDataWriteSupport(BloomFilter bloomFilter) {
+ super(bloomFilter);
}
@Override
- protected HoodieFileReader newParquetFileReader(StoragePath path) {
- return new HoodieRowDataParquetReader(storage, path);
+ protected byte[] getUTF8Bytes(String key) {
+ return StringUtils.getUTF8Bytes(key);
}
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieFlinkLanceArrowUtils.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieFlinkLanceArrowUtils.java
new file mode 100644
index 000000000000..07171f615b46
--- /dev/null
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieFlinkLanceArrowUtils.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.exception.HoodieNotSupportedException;
+
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+
+/**
+ * Primitive RowData/Arrow conversion helpers for Flink Lance base files.
+ */
+public final class HoodieFlinkLanceArrowUtils {
+
+ private HoodieFlinkLanceArrowUtils() {
+ }
+
+ public static Schema toArrowSchema(RowType rowType) {
+ List<Field> fields = new ArrayList<>(rowType.getFieldCount());
+ for (RowType.RowField field : rowType.getFields()) {
+ fields.add(toArrowField(field.getName(), field.getType()));
+ }
+ return new Schema(fields);
+ }
+
+ public static RowType toRowType(Schema schema) {
+ List<RowType.RowField> fields = new ArrayList<>(schema.getFields().size());
+ for (Field field : schema.getFields()) {
+ fields.add(new RowType.RowField(field.getName(),
toLogicalType(field.getType())));
+ }
+ return new RowType(fields);
+ }
+
+ public static RowData toRowData(RowType rowType, List<FieldVector> vectors,
int rowId) {
+ GenericRowData rowData = new GenericRowData(vectors.size());
+ for (int i = 0; i < vectors.size(); i++) {
+ FieldVector vector = vectors.get(i);
+ if (vector.isNull(rowId)) {
+ rowData.setField(i, null);
+ } else {
+ rowData.setField(i, readValue(rowType.getTypeAt(i), vector, rowId));
+ }
+ }
+ return rowData;
+ }
+
+ public static void writeValue(LogicalType type, FieldVector vector, int
rowId, RowData rowData, int ordinal) {
+ writeValue(type, vector, rowId, rowData, ordinal, true);
+ }
+
+ public static void writeValue(LogicalType type, FieldVector vector, int
rowId, RowData rowData, int ordinal, boolean utcTimestamp) {
+ if (rowData.isNullAt(ordinal)) {
+ vector.setNull(rowId);
+ return;
+ }
+ switch (type.getTypeRoot()) {
+ case BOOLEAN:
+ ((BitVector) vector).setSafe(rowId, rowData.getBoolean(ordinal) ? 1 :
0);
+ return;
+ case TINYINT:
+ ((TinyIntVector) vector).setSafe(rowId, rowData.getByte(ordinal));
+ return;
+ case SMALLINT:
+ ((SmallIntVector) vector).setSafe(rowId, rowData.getShort(ordinal));
+ return;
+ case INTEGER:
+ ((IntVector) vector).setSafe(rowId, rowData.getInt(ordinal));
+ return;
+ case DATE:
+ ((DateDayVector) vector).setSafe(rowId, rowData.getInt(ordinal));
+ return;
+ case TIME_WITHOUT_TIME_ZONE:
+ ((TimeMilliVector) vector).setSafe(rowId, rowData.getInt(ordinal));
+ return;
+ case BIGINT:
+ ((BigIntVector) vector).setSafe(rowId, rowData.getLong(ordinal));
+ return;
+ case FLOAT:
+ ((Float4Vector) vector).setSafe(rowId, rowData.getFloat(ordinal));
+ return;
+ case DOUBLE:
+ ((Float8Vector) vector).setSafe(rowId, rowData.getDouble(ordinal));
+ return;
+ case CHAR:
+ case VARCHAR:
+ ((VarCharVector) vector).setSafe(rowId,
rowData.getString(ordinal).toBytes());
+ return;
+ case BINARY:
+ case VARBINARY:
+ ((VarBinaryVector) vector).setSafe(rowId, rowData.getBinary(ordinal));
+ return;
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) type;
+ DecimalData decimal = rowData.getDecimal(ordinal,
decimalType.getPrecision(), decimalType.getScale());
+ ((DecimalVector) vector).setSafe(rowId, decimal.toBigDecimal());
+ return;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ TimestampData timestamp = rowData.getTimestamp(ordinal,
getPrecision(type));
+ long micros = timestampToMicros(timestamp, getPrecision(type),
utcTimestamp);
+ ((TimeStampMicroVector) vector).setSafe(rowId, micros);
+ return;
+ default:
+ throw unsupported(type);
+ }
+ }
+
+ private static Object readValue(LogicalType type, ValueVector vector, int
rowId) {
+ switch (type.getTypeRoot()) {
+ case BOOLEAN:
+ return ((BitVector) vector).get(rowId) == 1;
+ case TINYINT:
+ return ((TinyIntVector) vector).get(rowId);
+ case SMALLINT:
+ return ((SmallIntVector) vector).get(rowId);
+ case INTEGER:
+ return ((IntVector) vector).get(rowId);
+ case DATE:
+ return ((DateDayVector) vector).get(rowId);
+ case TIME_WITHOUT_TIME_ZONE:
+ return ((TimeMilliVector) vector).get(rowId);
+ case BIGINT:
+ return ((BigIntVector) vector).get(rowId);
+ case FLOAT:
+ return ((Float4Vector) vector).get(rowId);
+ case DOUBLE:
+ return ((Float8Vector) vector).get(rowId);
+ case CHAR:
+ case VARCHAR:
+ return StringData.fromBytes(((VarCharVector) vector).get(rowId));
+ case BINARY:
+ case VARBINARY:
+ return ((VarBinaryVector) vector).get(rowId);
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) type;
+ BigDecimal decimal = ((DecimalVector) vector).getObject(rowId);
+ return DecimalData.fromBigDecimal(decimal, decimalType.getPrecision(),
decimalType.getScale());
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ long micros = ((TimeStampMicroVector) vector).get(rowId);
+ return TimestampData.fromEpochMillis(Math.floorDiv(micros, 1000L),
(int) Math.floorMod(micros, 1000L) * 1000);
+ default:
+ throw unsupported(type);
+ }
+ }
+
+ private static Field toArrowField(String name, LogicalType type) {
+ return new Field(name, FieldType.nullable(toArrowType(type)),
Collections.emptyList());
+ }
+
+ private static ArrowType toArrowType(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case BOOLEAN:
+ return ArrowType.Bool.INSTANCE;
+ case TINYINT:
+ return new ArrowType.Int(8, true);
+ case SMALLINT:
+ return new ArrowType.Int(16, true);
+ case INTEGER:
+ return new ArrowType.Int(32, true);
+ case BIGINT:
+ return new ArrowType.Int(64, true);
+ case FLOAT:
+ return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE);
+ case DOUBLE:
+ return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
+ case CHAR:
+ case VARCHAR:
+ return ArrowType.Utf8.INSTANCE;
+ case BINARY:
+ case VARBINARY:
+ return ArrowType.Binary.INSTANCE;
+ case DATE:
+ return new ArrowType.Date(DateUnit.DAY);
+ case TIME_WITHOUT_TIME_ZONE:
+ return new ArrowType.Time(TimeUnit.MILLISECOND, 32);
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) type;
+ return new ArrowType.Decimal(decimalType.getPrecision(),
decimalType.getScale(), 128);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return new ArrowType.Timestamp(TimeUnit.MICROSECOND, null);
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC");
+ default:
+ throw unsupported(type);
+ }
+ }
+
+ private static LogicalType toLogicalType(ArrowType arrowType) {
+ if (arrowType instanceof ArrowType.Bool) {
+ return new BooleanType();
+ } else if (arrowType instanceof ArrowType.Int) {
+ ArrowType.Int intType = (ArrowType.Int) arrowType;
+ switch (intType.getBitWidth()) {
+ case 8:
+ return new TinyIntType();
+ case 16:
+ return new SmallIntType();
+ case 32:
+ return new IntType();
+ case 64:
+ return new BigIntType();
+ default:
+ throw new HoodieNotSupportedException("Unsupported Arrow int width
for Lance Flink reader: " + intType.getBitWidth());
+ }
+ } else if (arrowType instanceof ArrowType.FloatingPoint) {
+ ArrowType.FloatingPoint fp = (ArrowType.FloatingPoint) arrowType;
+ return fp.getPrecision() == FloatingPointPrecision.SINGLE
+ ? new FloatType()
+ : new DoubleType();
+ } else if (arrowType instanceof ArrowType.Utf8) {
+ return new VarCharType();
+ } else if (arrowType instanceof ArrowType.Binary) {
+ return new VarBinaryType();
+ } else if (arrowType instanceof ArrowType.Date) {
+ return new DateType();
+ } else if (arrowType instanceof ArrowType.Time) {
+ return new TimeType();
+ } else if (arrowType instanceof ArrowType.Decimal) {
+ ArrowType.Decimal decimal = (ArrowType.Decimal) arrowType;
+ return new DecimalType(decimal.getPrecision(), decimal.getScale());
+ } else if (arrowType instanceof ArrowType.Timestamp) {
+ ArrowType.Timestamp timestamp = (ArrowType.Timestamp) arrowType;
+ return timestamp.getTimezone() == null
+ ? new TimestampType(6)
+ : new LocalZonedTimestampType(6);
+ }
+ throw new HoodieNotSupportedException("Unsupported Arrow type for Lance
Flink reader: " + arrowType);
+ }
+
+ private static long timestampToMicros(TimestampData timestampData, int
precision, boolean utcTimestamp) {
+ long millis = utcTimestamp ? timestampData.getMillisecond() :
timestampData.toTimestamp().getTime();
+ return precision > 3 && utcTimestamp
+ ? millis * 1000L + timestampData.getNanoOfMillisecond() / 1000L
+ : millis * 1000L;
+ }
+
+ private static HoodieNotSupportedException unsupported(LogicalType type) {
+ return new HoodieNotSupportedException("Flink Lance base-file support
currently supports primitive append-only columns; unsupported type: " + type);
+ }
+}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
index 45333cf4b5db..dbccd3975012 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
@@ -294,6 +294,6 @@ public class HoodieRowDataCreateHandle implements
Serializable {
throws IOException {
StoragePath storagePath = new StoragePath(path.toUri());
return (HoodieRowDataFileWriter) new
HoodieRowDataFileWriterFactory(hoodieTable.getStorage())
- .newParquetFileWriter(instantTime, storagePath, config, rowType,
hoodieTable.getTaskContextSupplier());
+ .getFileWriter(instantTime, storagePath, config, rowType,
hoodieTable.getTaskContextSupplier());
}
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
index be3242164a40..49671d37f108 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
@@ -23,8 +23,10 @@ import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieParquetConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -35,6 +37,7 @@ import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.RowDataQueryContexts;
import org.apache.flink.table.types.logical.RowType;
@@ -44,6 +47,10 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
+import static org.apache.hudi.common.model.HoodieFileFormat.LANCE;
+import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
+import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
import static org.apache.hudi.common.util.ParquetUtils.getCompressionCodecName;
/**
@@ -55,6 +62,30 @@ public class HoodieRowDataFileWriterFactory extends
HoodieFileWriterFactory {
super(storage);
}
+ public HoodieFileWriter getFileWriter(String instantTime, StoragePath
storagePath, HoodieWriteConfig config, RowType rowType,
+ TaskContextSupplier taskContextSupplier)
throws IOException {
+ final String extension = FSUtils.getFileExtension(storagePath.getName());
+ return getFileWriterByFormat(extension, instantTime, storagePath, config,
rowType, taskContextSupplier);
+ }
+
+ private <T, I, K, O> HoodieFileWriter getFileWriterByFormat(
+ String extension, String instantTime, StoragePath path, HoodieConfig
config, RowType rowType,
+ TaskContextSupplier taskContextSupplier) throws IOException {
+ if (PARQUET.getFileExtension().equals(extension)) {
+ return newParquetFileWriter(instantTime, path, config, rowType,
taskContextSupplier);
+ }
+ if (HFILE.getFileExtension().equals(extension)) {
+ return newHFileFileWriter(instantTime, path, config,
HoodieSchemaConverter.convertToSchema(rowType), taskContextSupplier);
+ }
+ if (ORC.getFileExtension().equals(extension)) {
+ return newOrcFileWriter(instantTime, path, config,
HoodieSchemaConverter.convertToSchema(rowType), taskContextSupplier);
+ }
+ if (LANCE.getFileExtension().equals(extension)) {
+ return newLanceFileWriter(instantTime, path, config, rowType,
taskContextSupplier);
+ }
+ throw new UnsupportedOperationException(extension + " format not supported
yet.");
+ }
+
/**
* Create a parquet writer on a given OutputStream.
*
@@ -136,6 +167,30 @@ public class HoodieRowDataFileWriterFactory extends
HoodieFileWriterFactory {
instantTime, taskContextSupplier, populateMetaFields, withOperation);
}
+ public HoodieFileWriter newLanceFileWriter(
+ String instantTime,
+ StoragePath path,
+ HoodieConfig config,
+ RowType rowType,
+ TaskContextSupplier taskContextSupplier) {
+ boolean populateMetaFields =
config.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS);
+ boolean withOperation =
config.getBooleanOrDefault(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD);
+ Option<org.apache.hudi.common.bloom.BloomFilter> bloomFilter =
enableBloomFilter(populateMetaFields, config)
+ ? Option.of(createBloomFilter(config)) : Option.empty();
+ return new HoodieRowDataLanceWriter(
+ path,
+ rowType,
+ instantTime,
+ taskContextSupplier,
+ bloomFilter,
+ config.getLongOrDefault(HoodieStorageConfig.LANCE_MAX_FILE_SIZE),
+
config.getLongOrDefault(HoodieStorageConfig.LANCE_WRITE_ALLOCATOR_SIZE_BYTES),
+
config.getLongOrDefault(HoodieStorageConfig.LANCE_WRITE_FLUSH_BYTE_WATERMARK),
+ config.getBooleanOrDefault(HoodieStorageConfig.WRITE_UTC_TIMEZONE),
+ populateMetaFields,
+ withOperation);
+ }
+
private static HoodieParquetConfig<HoodieRowDataParquetWriteSupport>
getParquetConfig(
HoodieConfig config, HoodieRowDataParquetWriteSupport writeSupport) {
return new HoodieParquetConfig<>(
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataLanceWriter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataLanceWriter.java
new file mode 100644
index 000000000000..f4388c23fc43
--- /dev/null
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataLanceWriter.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.client.model.HoodieRowDataCreation;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.io.lance.HoodieBaseLanceWriter;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+/**
+ * Lance writer for Flink {@link RowData} append-only base files.
+ */
+public class HoodieRowDataLanceWriter extends HoodieBaseLanceWriter<RowData,
String>
+ implements HoodieRowDataFileWriter {
+
+ private static final long MIN_RECORDS_FOR_SIZE_CHECK = 100L;
+ private static final long MAX_RECORDS_FOR_SIZE_CHECK = 10000L;
+
+ private final RowType rowType;
+ private final Schema arrowSchema;
+ private final String fileName;
+ private final String instantTime;
+ private final long maxFileSize;
+ private final boolean utcTimestamp;
+ private final boolean populateMetaFields;
+ private final boolean withOperation;
+ private final Function<Long, String> seqIdGenerator;
+ private long recordCountForNextSizeCheck = MIN_RECORDS_FOR_SIZE_CHECK;
+
+ public HoodieRowDataLanceWriter(
+ StoragePath file,
+ RowType rowType,
+ String instantTime,
+ TaskContextSupplier taskContextSupplier,
+ Option<BloomFilter> bloomFilterOpt,
+ long maxFileSize,
+ long allocatorSize,
+ long flushByteWatermark,
+ boolean utcTimestamp,
+ boolean populateMetaFields,
+ boolean withOperation) {
+ super(file, DEFAULT_BATCH_SIZE, allocatorSize, flushByteWatermark,
+ bloomFilterOpt.map(HoodieBloomFilterRowDataWriteSupport::new));
+ ValidationUtils.checkArgument(maxFileSize > 0, "maxFileSize must be a
positive number");
+ ValidationUtils.checkArgument(allocatorSize > 0, "allocatorSize must be a
positive number");
+ ValidationUtils.checkArgument(flushByteWatermark > 0, "flushByteWatermark
must be a positive number");
+ ValidationUtils.checkArgument(flushByteWatermark < allocatorSize,
+ "flushByteWatermark (" + flushByteWatermark + ") must be less than
allocatorSize ("
+ + allocatorSize + ")");
+ this.rowType = rowType;
+ this.arrowSchema = HoodieFlinkLanceArrowUtils.toArrowSchema(rowType);
+ this.fileName = file.getName();
+ this.instantTime = instantTime;
+ this.maxFileSize = maxFileSize;
+ this.utcTimestamp = utcTimestamp;
+ this.populateMetaFields = populateMetaFields;
+ this.withOperation = withOperation;
+ this.seqIdGenerator = recordIndex -> {
+ Integer partitionId = taskContextSupplier.getPartitionIdSupplier().get();
+ return HoodieRecord.generateSequenceId(instantTime, partitionId,
recordIndex);
+ };
+ }
+
+ @Override
+ public boolean canWrite() {
+ long writtenCount = getWrittenRecordCount();
+ if (writtenCount >= recordCountForNextSizeCheck) {
+ long dataSize = getDataSize();
+ long avgRecordSize = Math.max(dataSize / writtenCount, 1);
+ if (dataSize > (maxFileSize - avgRecordSize * 2)) {
+ return false;
+ }
+ recordCountForNextSizeCheck = writtenCount + Math.min(
+ Math.max(MIN_RECORDS_FOR_SIZE_CHECK, (maxFileSize / avgRecordSize -
writtenCount) / 2),
+ MAX_RECORDS_FOR_SIZE_CHECK);
+ }
+ return true;
+ }
+
+ @Override
+ public void writeRow(String key, RowData row) throws IOException {
+ bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport ->
bloomFilterWriteSupport.addKey(key));
+ super.write(row);
+ }
+
+ @Override
+ public void writeRowWithMetaData(HoodieKey key, RowData row) throws
IOException {
+ if (populateMetaFields) {
+ RowData rowWithMeta = updateRecordMetadata(row, key,
getWrittenRecordCount());
+ writeRow(key.getRecordKey(), rowWithMeta);
+ } else {
+ writeRow(key.getRecordKey(), row);
+ }
+ }
+
+ @Override
+ protected ArrowWriter<RowData> createArrowWriter(VectorSchemaRoot root) {
+ return new RowDataArrowWriter(root);
+ }
+
+ @Override
+ protected Schema getArrowSchema() {
+ return arrowSchema;
+ }
+
+ private class RowDataArrowWriter implements ArrowWriter<RowData> {
+ private final VectorSchemaRoot root;
+ private int rowId;
+
+ private RowDataArrowWriter(VectorSchemaRoot root) {
+ this.root = root;
+ }
+
+ @Override
+ public void write(RowData row) {
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ HoodieFlinkLanceArrowUtils.writeValue(rowType.getTypeAt(i),
root.getVector(i), rowId, row, i, utcTimestamp);
+ }
+ rowId++;
+ }
+
+ @Override
+ public void finishBatch() {
+ root.getFieldVectors().forEach(vector -> vector.setValueCount(rowId));
+ root.setRowCount(rowId);
+ }
+
+ @Override
+ public void reset() {
+ rowId = 0;
+ }
+ }
+
+ private RowData updateRecordMetadata(RowData row, HoodieKey key, long
recordCount) {
+ return HoodieRowDataCreation.create(instantTime,
seqIdGenerator.apply(recordCount),
+ key.getRecordKey(), key.getPartitionPath(), fileName, row,
withOperation, true);
+ }
+}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
index b2b1d9c058cf..8dfb06872e9c 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
@@ -21,7 +21,6 @@ package org.apache.hudi.io.storage.row;
import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
@@ -61,15 +60,4 @@ public class HoodieRowDataParquetWriteSupport extends
RowDataParquetWriteSupport
this.bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport ->
bloomFilterWriteSupport.addKey(recordKey));
}
-
- private static class HoodieBloomFilterRowDataWriteSupport extends
HoodieBloomFilterWriteSupport<String> {
- public HoodieBloomFilterRowDataWriteSupport(BloomFilter bloomFilter) {
- super(bloomFilter);
- }
-
- @Override
- protected byte[] getUTF8Bytes(String key) {
- return StringUtils.getUTF8Bytes(key);
- }
- }
}
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieFlinkLanceArrowUtils.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieFlinkLanceArrowUtils.java
new file mode 100644
index 000000000000..2a9ecc5b0145
--- /dev/null
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieFlinkLanceArrowUtils.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+
+/**
+ * Tests for {@link HoodieFlinkLanceArrowUtils}.
+ */
+public class TestHoodieFlinkLanceArrowUtils {
+
+ @Test
+ public void testTimestampSchemaRoundTripPreservesLocalTimezone() {
+ RowType rowType = RowType.of(
+ new LogicalType[] {new TimestampType(6), new
LocalZonedTimestampType(6)},
+ new String[] {"timestamp", "local_timestamp"});
+
+ RowType roundTripped = HoodieFlinkLanceArrowUtils.toRowType(
+ HoodieFlinkLanceArrowUtils.toArrowSchema(rowType));
+
+ assertInstanceOf(TimestampType.class, roundTripped.getTypeAt(0));
+ assertInstanceOf(LocalZonedTimestampType.class, roundTripped.getTypeAt(1));
+ }
+
+ @Test
+ public void testTimestampWriteHonorsUtcTimestampFlag() {
+ TimestampData timestampData = TimestampData.fromEpochMillis(1234L, 567000);
+ GenericRowData rowData = GenericRowData.of(timestampData);
+
+ try (BufferAllocator allocator = new RootAllocator();
+ TimeStampMicroVector vector = new TimeStampMicroVector(
+ "ts",
+ FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MICROSECOND,
null)),
+ allocator)) {
+ HoodieFlinkLanceArrowUtils.writeValue(new TimestampType(6), vector, 0,
rowData, 0, true);
+ assertEquals(1234567L, vector.get(0));
+
+ HoodieFlinkLanceArrowUtils.writeValue(new TimestampType(6), vector, 1,
rowData, 0, false);
+ assertEquals(timestampData.toTimestamp().getTime() * 1000L,
vector.get(1));
+ }
+ }
+
+ @Test
+ public void testTimestampReadNormalizesPreEpochMicros() {
+ try (BufferAllocator allocator = new RootAllocator();
+ TimeStampMicroVector vector = new TimeStampMicroVector(
+ "ts",
+ FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MICROSECOND,
null)),
+ allocator)) {
+ vector.setSafe(0, -1_234_567L);
+ vector.setValueCount(1);
+
+ RowData rowData = HoodieFlinkLanceArrowUtils.toRowData(
+ RowType.of(new LogicalType[] {new TimestampType(6)}, new String[]
{"ts"}),
+ Collections.singletonList(vector),
+ 0);
+
+ assertEquals(TimestampData.fromEpochMillis(-1235L, 433000),
rowData.getTimestamp(0, 6));
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 2f2ada47731a..ae8dbb1e963b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table;
+import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieTableType;
@@ -86,9 +87,9 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
StoragePath path = new
StoragePath(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
new ValidationException("Option [path] should not be empty.")));
setupTableOptions(conf.get(FlinkOptions.PATH), conf);
- checkBaseFileFormat(conf);
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
setupConfOptions(conf, context.getObjectIdentifier(),
context.getCatalogTable(), schema);
+ checkBaseFileFormatForRead(conf, schema);
return new HoodieTableSource(
SerializableSchema.create(schema),
path,
@@ -116,11 +117,6 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
private void setupTableOptions(String basePath, Configuration conf) {
StreamerUtil.getTableConfig(basePath,
HadoopConfigurations.getHadoopConf(conf))
.ifPresent(tableConfig -> {
- // Guard: reject Lance from existing table config
(hoodie.properties); checkBaseFileFormat() handles user-supplied config
separately
- if (tableConfig.contains(HoodieTableConfig.BASE_FILE_FORMAT)
- &&
HoodieFileFormat.LANCE.name().equalsIgnoreCase(tableConfig.getString(HoodieTableConfig.BASE_FILE_FORMAT)))
{
- throw new
HoodieValidationException(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG);
- }
if (tableConfig.contains(HoodieTableConfig.RECORDKEY_FIELDS)
&& !conf.contains(FlinkOptions.RECORD_KEY_FIELD)) {
conf.set(FlinkOptions.RECORD_KEY_FIELD,
tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS));
@@ -177,8 +173,8 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
* @param schema The table schema
*/
private void sanityCheck(Configuration conf, ResolvedSchema schema) {
- checkBaseFileFormat(conf);
checkTableType(conf);
+ checkBaseFileFormatForWrite(conf, schema);
checkIndexType(conf);
if (!OptionsResolver.isAppendMode(conf)) {
@@ -220,15 +216,41 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
}
/**
- * Validate the base file format. Lance is only supported with the Spark
engine.
+ * Validate the base file format. Flink Lance support is scoped to
append-only COW tables.
*/
- private void checkBaseFileFormat(Configuration conf) {
- String baseFileFormat =
conf.getString(HoodieTableConfig.BASE_FILE_FORMAT.key(), null);
- if (baseFileFormat != null &&
HoodieFileFormat.LANCE.name().equalsIgnoreCase(baseFileFormat)) {
- throw new
HoodieValidationException(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG);
+ private void checkBaseFileFormatForRead(Configuration conf, ResolvedSchema
schema) {
+ checkLanceBaseFileFormat(conf, schema);
+ }
+
+ private void checkBaseFileFormatForWrite(Configuration conf, ResolvedSchema
schema) {
+ checkLanceBaseFileFormat(conf, schema);
+ if (isLanceBaseFileFormat(conf) && !OptionsResolver.isAppendMode(conf)) {
+ throw new HoodieValidationException("Flink Lance base-file writes
require append-only INSERT mode. Set '"
+ + FlinkOptions.OPERATION.key() + "' = 'insert'.");
+ }
+ }
+
+ private void checkLanceBaseFileFormat(Configuration conf, ResolvedSchema
schema) {
+ if (!isLanceBaseFileFormat(conf)) {
+ return;
+ }
+ if (conf.containsKey(FlinkOptions.RECORD_KEY_FIELD.key()) ||
schema.getPrimaryKey().isPresent()) {
+ throw new HoodieValidationException("Flink Lance base-file support is
only available for append-only tables without primary keys.");
+ }
+ if (OptionsResolver.isMorTable(conf)) {
+ throw new HoodieValidationException("Flink Lance base-file support is
only available for COPY_ON_WRITE append-only tables.");
+ }
+ if (OptionsResolver.isSchemaEvolutionEnabled(conf)) {
+ throw new HoodieValidationException("Flink Lance base-file support does
not support schema evolution. Set '"
+ + HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key() + "' = 'false'.");
}
}
+ private boolean isLanceBaseFileFormat(Configuration conf) {
+ String baseFileFormat =
conf.getString(HoodieTableConfig.BASE_FILE_FORMAT.key(), null);
+ return baseFileFormat != null &&
HoodieFileFormat.LANCE.name().equalsIgnoreCase(baseFileFormat);
+ }
+
/**
* Validate the table type.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index 2015244e24ce..130f94e59bf1 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -39,6 +39,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.source.ExpressionPredicates;
@@ -98,18 +99,30 @@ public class FlinkRowDataReaderContext extends
HoodieReaderContext<RowData> {
HoodieSchema dataSchema,
HoodieSchema requiredSchema,
HoodieStorage storage) throws IOException {
- if
(filePath.toString().endsWith(HoodieFileFormat.LANCE.getFileExtension())) {
- throw new
UnsupportedOperationException(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG);
- }
boolean isLogFile = FSUtils.isLogFile(filePath);
// disable schema evolution in fileReader if it's log file, since schema
evolution for log file is handled in `FileGroupRecordBuffer`
InternalSchemaManager schemaManager = isLogFile ?
InternalSchemaManager.DISABLED : internalSchemaManager.get();
+ if
(filePath.getName().endsWith(HoodieFileFormat.LANCE.getFileExtension())) {
+ if (schemaManager != InternalSchemaManager.DISABLED) {
+ throw new HoodieValidationException("Flink Lance base-file support
does not support schema evolution.");
+ }
+ HoodieRowDataLanceReader rowDataLanceReader =
+ (HoodieRowDataLanceReader) HoodieIOFactory.getIOFactory(storage)
+ .getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
+ .getFileReader(tableConfig, filePath, HoodieFileFormat.LANCE,
Option.empty());
+ try {
+ return
rowDataLanceReader.getRowDataIterator(RowDataQueryContexts.fromSchema(requiredSchema).getRowType(),
requiredSchema);
+ } catch (RuntimeException e) {
+ rowDataLanceReader.close();
+ throw new HoodieException("Failed to get iterator from lance reader",
e);
+ }
+ }
+ DataType rowType =
RowDataQueryContexts.fromSchema(dataSchema).getRowType();
HoodieRowDataParquetReader rowDataParquetReader =
(HoodieRowDataParquetReader) HoodieIOFactory.getIOFactory(storage)
.getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
.getFileReader(tableConfig, filePath, HoodieFileFormat.PARQUET,
Option.empty());
- DataType rowType =
RowDataQueryContexts.fromSchema(dataSchema).getRowType();
return rowDataParquetReader.getRowDataIterator(schemaManager, rowType,
requiredSchema, getSafePredicates(requiredSchema));
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReaderFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReaderFactory.java
index 2e7401d4caa0..3ad64bd3972a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReaderFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReaderFactory.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table.format;
+import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.storage.HoodieStorage;
@@ -35,4 +36,9 @@ public class HoodieRowDataFileReaderFactory extends
HoodieFileReaderFactory {
protected HoodieFileReader newParquetFileReader(StoragePath path) {
return new HoodieRowDataParquetReader(storage, path);
}
+
+ @Override
+ protected HoodieFileReader newLanceFileReader(HoodieConfig hoodieConfig,
StoragePath path) {
+ return new HoodieRowDataLanceReader(path, hoodieConfig);
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataLanceReader.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataLanceReader.java
new file mode 100644
index 000000000000..8c7564af6730
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataLanceReader.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format;
+
+import org.apache.hudi.client.model.HoodieFlinkRecord;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.bloom.SimpleBloomFilter;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.memory.HoodieArrowAllocator;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.row.HoodieFlinkLanceArrowUtils;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.util.HoodieSchemaConverter;
+import org.apache.hudi.util.RowDataQueryContexts;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.lance.file.LanceFileReader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.hudi.avro.HoodieBloomFilterWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
+import static
org.apache.hudi.avro.HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
+import static
org.apache.hudi.avro.HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
+import static
org.apache.hudi.avro.HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
+
+/**
+ * Lance reader for Flink RowData base files.
+ */
+public class HoodieRowDataLanceReader implements HoodieFileReader<RowData> {
+
+ private static final int DEFAULT_BATCH_SIZE = 512;
+
+ private final StoragePath path;
+ private final long dataAllocatorSize;
+ private final BufferAllocator metadataAllocator;
+ private final LanceFileReader metadataReader;
+ private final Schema arrowSchema;
+ private boolean closed;
+
+ public HoodieRowDataLanceReader(StoragePath path, HoodieConfig hoodieConfig)
{
+ this.path = path;
+ this.dataAllocatorSize =
hoodieConfig.getLongOrDefault(HoodieStorageConfig.LANCE_READ_ALLOCATOR_SIZE_BYTES);
+ this.metadataAllocator = HoodieArrowAllocator.newChildAllocator(
+ getClass().getSimpleName() + "-metadata-" + path.getName(),
+
hoodieConfig.getLongOrDefault(HoodieStorageConfig.LANCE_READ_METADATA_ALLOCATOR_SIZE_BYTES));
+ try {
+ this.metadataReader = LanceFileReader.open(path.toString(),
metadataAllocator);
+ this.arrowSchema = metadataReader.schema();
+ } catch (Exception e) {
+ close();
+ throw new HoodieException("Failed to create Lance reader for: " + path,
e);
+ }
+ }
+
+ @Override
+ public String[] readMinMaxRecordKeys() {
+ Map<String, String> metadata = arrowSchema.getCustomMetadata();
+ if (metadata != null) {
+ String minKey = metadata.get(HOODIE_MIN_RECORD_KEY_FOOTER);
+ String maxKey = metadata.get(HOODIE_MAX_RECORD_KEY_FOOTER);
+ if (minKey != null && maxKey != null) {
+ return new String[] {minKey, maxKey};
+ }
+ }
+ throw new HoodieException("Could not read min/max record key out of Lance
file: " + path);
+ }
+
+ @Override
+ public BloomFilter readBloomFilter() {
+ Map<String, String> metadata = arrowSchema.getCustomMetadata();
+ if (metadata == null ||
!metadata.containsKey(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY)) {
+ return null;
+ }
+ String bloomSer = metadata.get(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
+ String filterType = metadata.get(HOODIE_BLOOM_FILTER_TYPE_CODE);
+ if (filterType != null &&
filterType.contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
+ return new HoodieDynamicBoundedBloomFilter(bloomSer);
+ }
+ return new SimpleBloomFilter(bloomSer);
+ }
+
+ @Override
+ public Set<Pair<String, Long>> filterRowKeys(Set<String> candidateRowKeys) {
+ throw new HoodieException("Filtering row keys from Lance files is not
supported for Flink append-only tables without primary keys: " + path);
+ }
+
+ @Override
+ public ClosableIterator<HoodieRecord<RowData>>
getRecordIterator(HoodieSchema readerSchema, HoodieSchema requestedSchema)
throws IOException {
+ ClosableIterator<RowData> rowDataItr =
getRowDataIterator(RowDataQueryContexts.fromSchema(requestedSchema).getRowType(),
requestedSchema);
+ return new CloseableMappingIterator<>(rowDataItr, HoodieFlinkRecord::new);
+ }
+
+ @Override
+ public ClosableIterator<String> getRecordKeyIterator() throws IOException {
+ HoodieSchema schema = HoodieSchemaUtils.getRecordKeySchema();
+ ClosableIterator<RowData> rowDataItr =
getRowDataIterator(RowDataQueryContexts.fromSchema(schema).getRowType(),
schema);
+ return new CloseableMappingIterator<>(rowDataItr, rowData ->
rowData.getString(0).toString());
+ }
+
+ public ClosableIterator<RowData> getRowDataIterator(DataType dataType,
HoodieSchema requestedSchema) {
+ RowType rowType = (RowType) dataType.getLogicalType();
+ List<String> columnNames = new ArrayList<>(rowType.getFieldCount());
+ for (RowType.RowField field : rowType.getFields()) {
+ columnNames.add(field.getName());
+ }
+ BufferAllocator allocator = HoodieArrowAllocator.newChildAllocator(
+ getClass().getSimpleName() + "-data-" + path.getName(),
dataAllocatorSize);
+ LanceFileReader lanceReader = null;
+ ArrowReader arrowReader = null;
+ try {
+ lanceReader = LanceFileReader.open(path.toString(), allocator);
+ arrowReader = lanceReader.readAll(columnNames, null, DEFAULT_BATCH_SIZE);
+ return new LanceRowDataIterator(allocator, lanceReader, arrowReader,
rowType, this);
+ } catch (Exception e) {
+ if (arrowReader != null) {
+ try {
+ arrowReader.close();
+ } catch (Exception closeException) {
+ e.addSuppressed(closeException);
+ }
+ }
+ if (lanceReader != null) {
+ try {
+ lanceReader.close();
+ } catch (Exception closeException) {
+ e.addSuppressed(closeException);
+ }
+ }
+ allocator.close();
+ throw new HoodieException("Failed to create Lance row iterator for: " +
path, e);
+ }
+ }
+
+ @Override
+ public HoodieSchema getSchema() {
+ RowType rowType = HoodieFlinkLanceArrowUtils.toRowType(arrowSchema);
+ return HoodieSchemaConverter.convertToSchema(rowType);
+ }
+
+ @Override
+ public void close() {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ if (metadataReader != null) {
+ try {
+ metadataReader.close();
+ } catch (Exception e) {
+ // ignore close failure; readers surface data-path exceptions earlier
+ }
+ }
+ if (metadataAllocator != null) {
+ metadataAllocator.close();
+ }
+ }
+
+ @Override
+ public long getTotalRecords() {
+ try {
+ return metadataReader.numRows();
+ } catch (Exception e) {
+ throw new HoodieException("Failed to read row count from Lance file: " +
path, e);
+ }
+ }
+
+ private static class LanceRowDataIterator implements
ClosableIterator<RowData> {
+ private final BufferAllocator allocator;
+ private final LanceFileReader lanceReader;
+ private final ArrowReader arrowReader;
+ private final RowType rowType;
+ private final HoodieRowDataLanceReader reader;
+ private VectorSchemaRoot batch;
+ private List<FieldVector> orderedVectors;
+ private int rowId;
+ private boolean hasNext;
+ private boolean closed;
+
+ private LanceRowDataIterator(
+ BufferAllocator allocator,
+ LanceFileReader lanceReader,
+ ArrowReader arrowReader,
+ RowType rowType,
+ HoodieRowDataLanceReader reader) {
+ this.allocator = allocator;
+ this.lanceReader = lanceReader;
+ this.arrowReader = arrowReader;
+ this.rowType = rowType;
+ this.reader = reader;
+ loadNextBatch();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return hasNext;
+ }
+
+ @Override
+ public RowData next() {
+ RowData rowData = HoodieFlinkLanceArrowUtils.toRowData(rowType,
orderedVectors, rowId++);
+ if (rowId >= batch.getRowCount()) {
+ loadNextBatch();
+ }
+ return rowData;
+ }
+
+ private void loadNextBatch() {
+ try {
+ do {
+ hasNext = arrowReader.loadNextBatch();
+ if (hasNext) {
+ batch = arrowReader.getVectorSchemaRoot();
+ orderedVectors = orderVectors(rowType, batch.getFieldVectors());
+ rowId = 0;
+ }
+ } while (hasNext && batch.getRowCount() == 0);
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to read Lance batch", e);
+ }
+ }
+
+ private static List<FieldVector> orderVectors(RowType rowType,
List<FieldVector> vectors) {
+ Map<String, FieldVector> vectorsByName = new HashMap<>();
+ for (FieldVector vector : vectors) {
+ vectorsByName.put(vector.getName(), vector);
+ }
+ List<FieldVector> orderedVectors = new
ArrayList<>(rowType.getFieldCount());
+ for (RowType.RowField field : rowType.getFields()) {
+ FieldVector vector = vectorsByName.get(field.getName());
+ if (vector == null) {
+ throw new HoodieException("Missing Lance column in projected batch:
" + field.getName());
+ }
+ orderedVectors.add(vector);
+ }
+ return orderedVectors;
+ }
+
+ @Override
+ public void close() {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ try {
+ arrowReader.close();
+ } catch (Exception e) {
+ throw new HoodieException("Failed to close Lance Arrow reader", e);
+ } finally {
+ try {
+ lanceReader.close();
+ } catch (Exception e) {
+ throw new HoodieException("Failed to close Lance reader", e);
+ } finally {
+ try {
+ allocator.close();
+ } finally {
+ reader.close();
+ }
+ }
+ }
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
index 5a2e94b833ba..9e59e1ed3344 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
@@ -18,12 +18,19 @@
package org.apache.hudi.table.format.cow;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.source.ExpressionPredicates.Predicate;
+import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.format.FilePathUtils;
+import org.apache.hudi.table.format.HoodieRowDataLanceReader;
import org.apache.hudi.table.format.InternalSchemaManager;
import org.apache.hudi.table.format.RecordIterators;
+import org.apache.hudi.util.HoodieSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.io.FileInputFormat;
@@ -33,6 +40,7 @@ import
org.apache.flink.api.common.io.compression.InflaterInputStreamFactory;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.utils.SerializableConfiguration;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.hadoop.conf.Configuration;
@@ -116,32 +124,50 @@ public class CopyOnWriteInputFormat extends
FileInputFormat<RowData> {
@Override
public void open(FileInputSplit fileSplit) throws IOException {
- LinkedHashMap<String, Object> partObjects =
FilePathUtils.generatePartitionSpecs(
- fileSplit.getPath().getPath(),
- Arrays.asList(fullFieldNames),
- Arrays.asList(fullFieldTypes),
- this.partDefaultName,
- this.partPathField,
- this.hiveStylePartitioning
- );
-
- this.itr = RecordIterators.getParquetRecordIterator(
- internalSchemaManager,
- utcTimestamp,
- true,
- conf.conf(),
- fullFieldNames,
- fullFieldTypes,
- partObjects,
- selectedFields,
- 2048,
- fileSplit.getPath(),
- fileSplit.getStart(),
- fileSplit.getLength(),
- predicates);
+ if
(fileSplit.getPath().getName().endsWith(HoodieFileFormat.LANCE.getFileExtension()))
{
+ this.itr = getLanceRecordIterator(fileSplit.getPath());
+ } else {
+ LinkedHashMap<String, Object> partObjects =
FilePathUtils.generatePartitionSpecs(
+ fileSplit.getPath().getPath(),
+ Arrays.asList(fullFieldNames),
+ Arrays.asList(fullFieldTypes),
+ this.partDefaultName,
+ this.partPathField,
+ this.hiveStylePartitioning
+ );
+ this.itr = RecordIterators.getParquetRecordIterator(
+ internalSchemaManager,
+ utcTimestamp,
+ true,
+ conf.conf(),
+ fullFieldNames,
+ fullFieldTypes,
+ partObjects,
+ selectedFields,
+ 2048,
+ fileSplit.getPath(),
+ fileSplit.getStart(),
+ fileSplit.getLength(),
+ predicates);
+ }
this.currentReadCount = 0L;
}
+ private ClosableIterator<RowData> getLanceRecordIterator(Path path) {
+ DataType selectedDataType = DataTypes.ROW(Arrays.stream(selectedFields)
+ .mapToObj(i -> DataTypes.FIELD(fullFieldNames[i],
fullFieldTypes[i]))
+ .toArray(DataTypes.Field[]::new))
+ .bridgedTo(RowData.class);
+ HoodieSchema requestedSchema =
HoodieSchemaConverter.convertToSchema(selectedDataType.getLogicalType());
+ HoodieRowDataLanceReader reader = new HoodieRowDataLanceReader(new
StoragePath(path.toString()), StreamerUtil.getLanceReadConfig(conf.conf()));
+ try {
+ return reader.getRowDataIterator(selectedDataType, requestedSchema);
+ } catch (RuntimeException e) {
+ reader.close();
+ throw new HoodieException("Failed to get iterator from lance reader", e);
+ }
+ }
+
@Override
public FileInputSplit[] createInputSplits(int minNumSplits) throws
IOException {
if (minNumSplits < 1) {
@@ -379,6 +405,10 @@ public class CopyOnWriteInputFormat extends
FileInputFormat<RowData> {
}
private boolean testForUnsplittable(FileStatus pathFile) {
+ if
(pathFile.getPath().getName().endsWith(HoodieFileFormat.LANCE.getFileExtension()))
{
+ unsplittable = true;
+ return true;
+ }
if (getInflaterInputStreamFactory(pathFile.getPath()) != null) {
unsplittable = true;
return true;
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index f88d1a76a325..c17e0528e458 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -25,7 +25,9 @@ import
org.apache.hudi.client.model.EventTimeFlinkRecordMerger;
import org.apache.hudi.client.model.PartialUpdateFlinkRecordMerger;
import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
+import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
@@ -275,6 +277,22 @@ public class StreamerUtil {
return properties;
}
+ /**
+ * Builds a Lance read config from storage options carried in the Hadoop
configuration.
+ */
+ public static HoodieConfig
getLanceReadConfig(org.apache.hadoop.conf.Configuration conf) {
+ HoodieConfig hoodieConfig = new HoodieConfig();
+ String dataAllocatorSize =
conf.get(HoodieStorageConfig.LANCE_READ_ALLOCATOR_SIZE_BYTES.key());
+ if (dataAllocatorSize != null) {
+
hoodieConfig.setValue(HoodieStorageConfig.LANCE_READ_ALLOCATOR_SIZE_BYTES,
dataAllocatorSize);
+ }
+ String metadataAllocatorSize =
conf.get(HoodieStorageConfig.LANCE_READ_METADATA_ALLOCATOR_SIZE_BYTES.key());
+ if (metadataAllocatorSize != null) {
+
hoodieConfig.setValue(HoodieStorageConfig.LANCE_READ_METADATA_ALLOCATOR_SIZE_BYTES,
metadataAllocatorSize);
+ }
+ return hoodieConfig;
+ }
+
public static void initTableFromClientIfNecessary(Configuration conf) {
// Since Flink 2.0, the adaptive execution for batch job will generate job
graph incrementally
// for multiple stages (FLIP-469). And the write coordinator is
initialized along with write
@@ -318,6 +336,7 @@ public class StreamerUtil {
.setTableName(conf.get(FlinkOptions.TABLE_NAME))
.setTableVersion(conf.get(FlinkOptions.WRITE_TABLE_VERSION))
.setTableFormat(conf.get(FlinkOptions.WRITE_TABLE_FORMAT))
+
.setBaseFileFormat(conf.getString(HoodieTableConfig.BASE_FILE_FORMAT.key(),
null))
.setRecordMergeMode(getMergeMode(conf))
.setRecordMergeStrategyId(getMergeStrategyId(conf))
.setPayloadClassName(getPayloadClass(conf))
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 5d6a5daa061e..1cc5beaad418 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -21,7 +21,6 @@ package org.apache.hudi.table;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
@@ -65,7 +64,6 @@ import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
-import org.apache.flink.util.ExceptionUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -1360,31 +1358,27 @@ public class ITTestHoodieDataSource {
}
@Test
- void testLanceFormatRejectedByFlink() {
- // Lance base file format is only supported with the Spark engine.
- // Flink should reject it early with a clear error on both read and write
paths.
- String createLanceTable = sql("lance_t1")
+ void testLanceFormatAppendOnlyWriteAndRead() {
+ String createHoodieTable = sql("lance_t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
- .options(getDefaultKeys())
+ .option(FlinkOptions.OPERATION, "insert")
.option("hoodie.table.base.file.format", "LANCE")
.end();
+ batchTableEnv.executeSql(createHoodieTable);
- // Creating the table itself succeeds (DDL is just metadata registration),
- // but any attempt to read or write should fail.
- // Flink wraps our HoodieValidationException in its own
ValidationException.
- batchTableEnv.executeSql(createLanceTable);
+ execInsertSql(batchTableEnv, "insert into lance_t1 values "
+ + "('id1', 'Alice', 23, TIMESTAMP '1970-01-01 00:00:01', 'par1'),"
+ + "('id2', 'Bob', 31, TIMESTAMP '1970-01-01 00:00:02', 'par2')");
- // Source (read) path should throw
- ValidationException readEx = assertThrows(ValidationException.class,
- () -> execSelectSql(batchTableEnv, "select * from lance_t1"),
- "Lance format should be rejected when reading via Flink");
- assertTrue(ExceptionUtils.findThrowableWithMessage(readEx,
HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG).isPresent());
+ List<Row> rows = CollectionUtil.iteratorToList(
+ batchTableEnv.executeSql("select uuid, name, age, ts, `partition` from
lance_t1").collect());
+ assertRowsEquals(rows,
+ "[+I[id1, Alice, 23, 1970-01-01T00:00:01, par1], "
+ + "+I[id2, Bob, 31, 1970-01-01T00:00:02, par2]]");
- // Sink (write) path should throw
- ValidationException writeEx = assertThrows(ValidationException.class,
- () -> execInsertSql(batchTableEnv, "insert into lance_t1 values
('id1', 'Alice', 23, TIMESTAMP '1970-01-01 00:00:01', 'par1')"),
- "Lance format should be rejected when writing via Flink");
- assertTrue(ExceptionUtils.findThrowableWithMessage(writeEx,
HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG).isPresent());
+ List<Row> projectedRows = CollectionUtil.iteratorToList(
+ batchTableEnv.executeSql("select name, uuid from lance_t1").collect());
+ assertRowsEquals(projectedRows, "[+I[Alice, id1], +I[Bob, id2]]");
}
@ParameterizedTest
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
index 7a8ce8dcd80a..84e1136bc296 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
@@ -22,6 +22,7 @@ package org.apache.hudi.table;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.schema.HoodieSchema;
@@ -57,6 +58,7 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -85,6 +87,12 @@ public class TestHoodieFileGroupReaderOnFlink extends
TestHoodieFileGroupReaderB
private Configuration conf;
private Option<InstantRange> instantRangeOpt = Option.empty();
+ @BeforeAll
+ public static void setUpClass() {
+ // add the lance format when composition type is supported
+ supportedFileFormats = Collections.singletonList(HoodieFileFormat.PARQUET);
+ }
+
@BeforeEach
public void setup() {
conf = new Configuration();
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index f594e9575860..6aa6f20581c3 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -18,10 +18,10 @@
package org.apache.hudi.table;
+import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.EventTimeAvroPayload;
-import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieValidationException;
@@ -788,28 +788,60 @@ public class TestHoodieTableFactory {
}
@Test
- void testLanceFormatNotSupportedByFlink() {
- // Lance base file format is only supported with the Spark engine.
- // Both source and sink should reject it with a clear error message.
- this.conf.setString("hoodie.table.base.file.format", "LANCE");
- ResolvedSchema schema = SchemaBuilder.instance()
+ void testLanceFormatSupportedForAppendOnlyTables() {
+ Configuration lanceConf = new Configuration();
+ lanceConf.set(FlinkOptions.PATH, new File(tempFile,
"lance").getAbsolutePath());
+ lanceConf.set(FlinkOptions.TABLE_NAME, "lance_t1");
+ lanceConf.set(FlinkOptions.OPERATION, "insert");
+ lanceConf.setString("hoodie.table.base.file.format", "LANCE");
+ ResolvedSchema appendOnlySchema = SchemaBuilder.instance()
.field("f0", DataTypes.INT().notNull())
.field("f1", DataTypes.VARCHAR(20))
.field("f2", DataTypes.TIMESTAMP(3))
.field("ts", DataTypes.TIMESTAMP(3))
+ .build();
+ final MockContext appendOnlyContext = MockContext.getInstance(lanceConf,
appendOnlySchema, "f2");
+
+ assertDoesNotThrow(() -> new
HoodieTableFactory().createDynamicTableSink(appendOnlyContext));
+
+ Configuration morConf = new Configuration(lanceConf);
+ morConf.set(FlinkOptions.TABLE_TYPE,
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+ final MockContext morContext = MockContext.getInstance(morConf,
appendOnlySchema, "f2");
+ HoodieValidationException morEx =
assertThrows(HoodieValidationException.class,
+ () -> new HoodieTableFactory().createDynamicTableSink(morContext));
+ assertThat(morEx.getMessage(), is("Flink Lance base-file support is only
available for COPY_ON_WRITE append-only tables."));
+
+ Configuration upsertConf = new Configuration(lanceConf);
+ upsertConf.set(FlinkOptions.OPERATION, "upsert");
+ final MockContext upsertContext = MockContext.getInstance(upsertConf,
appendOnlySchema, "f2");
+ HoodieValidationException operationEx =
assertThrows(HoodieValidationException.class,
+ () -> new HoodieTableFactory().createDynamicTableSink(upsertContext));
+ assertThat(operationEx.getMessage(), is("Flink Lance base-file writes
require append-only INSERT mode. Set '"
+ + FlinkOptions.OPERATION.key() + "' = 'insert'."));
+
+ Configuration schemaEvolutionConf = new Configuration(lanceConf);
+
schemaEvolutionConf.setString(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(),
"true");
+ final MockContext schemaEvolutionContext =
MockContext.getInstance(schemaEvolutionConf, appendOnlySchema, "f2");
+ HoodieValidationException schemaEvolutionEx =
assertThrows(HoodieValidationException.class,
+ () -> new
HoodieTableFactory().createDynamicTableSink(schemaEvolutionContext));
+ assertThat(schemaEvolutionEx.getMessage(), is("Flink Lance base-file
support does not support schema evolution. Set '"
+ + HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key() + "' = 'false'."));
+
+ ResolvedSchema primaryKeySchema = SchemaBuilder.instance()
+ .field("f0", DataTypes.INT().notNull())
+ .field("f1", DataTypes.VARCHAR(20))
.primaryKey("f0")
.build();
- final MockContext context = MockContext.getInstance(this.conf, schema,
"f2");
-
- // Source path should throw
- HoodieValidationException sourceEx =
assertThrows(HoodieValidationException.class,
- () -> new HoodieTableFactory().createDynamicTableSource(context));
- assertThat(sourceEx.getMessage(),
is(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG));
+ final MockContext primaryKeyContext = MockContext.getInstance(lanceConf,
primaryKeySchema, "f1");
+ HoodieValidationException primaryKeyEx =
assertThrows(HoodieValidationException.class,
+ () -> new
HoodieTableFactory().createDynamicTableSink(primaryKeyContext));
+ assertThat(primaryKeyEx.getMessage(), is("Flink Lance base-file support is
only available for append-only tables without primary keys."));
- // Sink path should throw
+ lanceConf.set(FlinkOptions.RECORD_KEY_FIELD, "f0");
+ final MockContext keyedContext = MockContext.getInstance(lanceConf,
appendOnlySchema, "f2");
HoodieValidationException sinkEx =
assertThrows(HoodieValidationException.class,
- () -> new HoodieTableFactory().createDynamicTableSink(context));
- assertThat(sinkEx.getMessage(),
is(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG));
+ () -> new HoodieTableFactory().createDynamicTableSink(keyedContext));
+ assertThat(sinkEx.getMessage(), is("Flink Lance base-file support is only
available for append-only tables without primary keys."));
}
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
index 9934d8a4e160..379a769ec815 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.catalog;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig;
import org.apache.hudi.common.schema.HoodieSchema;
@@ -380,6 +381,35 @@ public class TestHoodieCatalog extends
BaseTestHoodieCatalog {
assertEquals("Primary key definition is missing", exception.getMessage());
}
+ @Test
+ public void testCreateAppendOnlyLanceTableWithoutPrimaryKey() throws
Exception {
+ ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE,
"tb_lance_append_only");
+ Map<String, String> lanceOptions = new HashMap<>(EXPECTED_OPTIONS);
+ lanceOptions.put(FlinkOptions.TABLE_TYPE.key(),
FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
+ lanceOptions.put(FlinkOptions.OPERATION.key(), "insert");
+ lanceOptions.put(FlinkOptions.PRE_COMBINE.key(), "false");
+ lanceOptions.put(HoodieTableConfig.BASE_FILE_FORMAT.key(),
HoodieFileFormat.LANCE.name());
+ ResolvedSchema appendOnlySchema = new ResolvedSchema(CREATE_COLUMNS,
Collections.emptyList(), null);
+ ResolvedCatalogTable lanceTable = new ResolvedCatalogTable(
+ CatalogUtils.createCatalogTable(
+ Schema.newBuilder().fromResolvedSchema(appendOnlySchema).build(),
+ Arrays.asList("partition"),
+ lanceOptions,
+ "test_lance_append_only"),
+ appendOnlySchema
+ );
+
+ catalog.createTable(tablePath, lanceTable, false);
+
+ assertTrue(catalog.tableExists(tablePath));
+ CatalogBaseTable actualTable = catalog.getTable(tablePath);
+
assertFalse(actualTable.getOptions().containsKey(TableOptionProperties.PK_COLUMNS));
+ HoodieTableMetaClient metaClient = createMetaClient(
+ new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(new
Configuration())),
+ catalog.inferTablePath(catalogPathStr, tablePath));
+ assertThat(metaClient.getTableConfig().getBaseFileFormat(),
is(HoodieFileFormat.LANCE));
+ }
+
@Test
void testCreateTableWithPartitionBucketIndex() throws
TableAlreadyExistException, DatabaseNotExistException, IOException {
String rule = "regex";
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java
index 189e297024fb..751f63d6de75 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java
@@ -28,7 +28,6 @@ import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.source.ExpressionPredicates;
import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.storage.StoragePath;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
@@ -44,7 +43,6 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -134,14 +132,6 @@ class TestFlinkRowDataReaderContext {
assertTrue(result.getBoolean(2));
}
- @Test
- void testLanceFormatThrowsInGetFileRecordIterator() {
- StoragePath lancePath = new
StoragePath("/tmp/test-table/partition/file.lance");
- UnsupportedOperationException ex =
assertThrows(UnsupportedOperationException.class,
- () -> readerContext.getFileRecordIterator(lancePath, 0, 100, SCHEMA,
SCHEMA, null));
- assertEquals(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG, ex.getMessage());
- }
-
private GenericRowData createBaseRow(int id, String name, boolean active) {
return GenericRowData.of(id, StringData.fromString(name), active);
}