This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bf6b44fcd1f feat(flink): add lance format for Flink append only table 
(#18741)
1bf6b44fcd1f is described below

commit 1bf6b44fcd1f0b682d2c246b620a0413f2ef432a
Author: Danny Chan <[email protected]>
AuthorDate: Wed May 27 13:04:29 2026 +0800

    feat(flink): add lance format for Flink append only table (#18741)
---
 .../row/HoodieBloomFilterRowDataWriteSupport.java  |  22 +-
 .../io/storage/row/HoodieFlinkLanceArrowUtils.java | 305 +++++++++++++++++++++
 .../io/storage/row/HoodieRowDataCreateHandle.java  |   2 +-
 .../row/HoodieRowDataFileWriterFactory.java        |  55 ++++
 .../io/storage/row/HoodieRowDataLanceWriter.java   | 167 +++++++++++
 .../row/HoodieRowDataParquetWriteSupport.java      |  12 -
 .../row/TestHoodieFlinkLanceArrowUtils.java        |  95 +++++++
 .../org/apache/hudi/table/HoodieTableFactory.java  |  46 +++-
 .../table/format/FlinkRowDataReaderContext.java    |  21 +-
 .../format/HoodieRowDataFileReaderFactory.java     |   6 +
 .../table/format/HoodieRowDataLanceReader.java     | 301 ++++++++++++++++++++
 .../table/format/cow/CopyOnWriteInputFormat.java   |  76 +++--
 .../java/org/apache/hudi/util/StreamerUtil.java    |  19 ++
 .../apache/hudi/table/ITTestHoodieDataSource.java  |  36 +--
 .../table/TestHoodieFileGroupReaderOnFlink.java    |   8 +
 .../apache/hudi/table/TestHoodieTableFactory.java  |  62 ++++-
 .../hudi/table/catalog/TestHoodieCatalog.java      |  30 ++
 .../format/TestFlinkRowDataReaderContext.java      |  10 -
 18 files changed, 1164 insertions(+), 109 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReaderFactory.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieBloomFilterRowDataWriteSupport.java
similarity index 59%
copy from 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReaderFactory.java
copy to 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieBloomFilterRowDataWriteSupport.java
index 2e7401d4caa0..36d9e28be3e2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReaderFactory.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieBloomFilterRowDataWriteSupport.java
@@ -16,23 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.format;
+package org.apache.hudi.io.storage.row;
 
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
-import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.util.StringUtils;
 
 /**
- * Factory methods to create RowData file reader.
+ * Bloom-filter footer support for Flink RowData base-file writers.
  */
-public class HoodieRowDataFileReaderFactory extends HoodieFileReaderFactory {
-  public HoodieRowDataFileReaderFactory(HoodieStorage storage) {
-    super(storage);
+class HoodieBloomFilterRowDataWriteSupport extends 
HoodieBloomFilterWriteSupport<String> {
+
+  HoodieBloomFilterRowDataWriteSupport(BloomFilter bloomFilter) {
+    super(bloomFilter);
   }
 
   @Override
-  protected HoodieFileReader newParquetFileReader(StoragePath path) {
-    return new HoodieRowDataParquetReader(storage, path);
+  protected byte[] getUTF8Bytes(String key) {
+    return StringUtils.getUTF8Bytes(key);
   }
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieFlinkLanceArrowUtils.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieFlinkLanceArrowUtils.java
new file mode 100644
index 000000000000..07171f615b46
--- /dev/null
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieFlinkLanceArrowUtils.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.exception.HoodieNotSupportedException;
+
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+
+/**
+ * Primitive RowData/Arrow conversion helpers for Flink Lance base files.
+ */
+public final class HoodieFlinkLanceArrowUtils {
+
+  private HoodieFlinkLanceArrowUtils() {
+  }
+
+  public static Schema toArrowSchema(RowType rowType) {
+    List<Field> fields = new ArrayList<>(rowType.getFieldCount());
+    for (RowType.RowField field : rowType.getFields()) {
+      fields.add(toArrowField(field.getName(), field.getType()));
+    }
+    return new Schema(fields);
+  }
+
+  public static RowType toRowType(Schema schema) {
+    List<RowType.RowField> fields = new ArrayList<>(schema.getFields().size());
+    for (Field field : schema.getFields()) {
+      fields.add(new RowType.RowField(field.getName(), 
toLogicalType(field.getType())));
+    }
+    return new RowType(fields);
+  }
+
+  public static RowData toRowData(RowType rowType, List<FieldVector> vectors, 
int rowId) {
+    GenericRowData rowData = new GenericRowData(vectors.size());
+    for (int i = 0; i < vectors.size(); i++) {
+      FieldVector vector = vectors.get(i);
+      if (vector.isNull(rowId)) {
+        rowData.setField(i, null);
+      } else {
+        rowData.setField(i, readValue(rowType.getTypeAt(i), vector, rowId));
+      }
+    }
+    return rowData;
+  }
+
+  public static void writeValue(LogicalType type, FieldVector vector, int 
rowId, RowData rowData, int ordinal) {
+    writeValue(type, vector, rowId, rowData, ordinal, true);
+  }
+
+  public static void writeValue(LogicalType type, FieldVector vector, int 
rowId, RowData rowData, int ordinal, boolean utcTimestamp) {
+    if (rowData.isNullAt(ordinal)) {
+      vector.setNull(rowId);
+      return;
+    }
+    switch (type.getTypeRoot()) {
+      case BOOLEAN:
+        ((BitVector) vector).setSafe(rowId, rowData.getBoolean(ordinal) ? 1 : 
0);
+        return;
+      case TINYINT:
+        ((TinyIntVector) vector).setSafe(rowId, rowData.getByte(ordinal));
+        return;
+      case SMALLINT:
+        ((SmallIntVector) vector).setSafe(rowId, rowData.getShort(ordinal));
+        return;
+      case INTEGER:
+        ((IntVector) vector).setSafe(rowId, rowData.getInt(ordinal));
+        return;
+      case DATE:
+        ((DateDayVector) vector).setSafe(rowId, rowData.getInt(ordinal));
+        return;
+      case TIME_WITHOUT_TIME_ZONE:
+        ((TimeMilliVector) vector).setSafe(rowId, rowData.getInt(ordinal));
+        return;
+      case BIGINT:
+        ((BigIntVector) vector).setSafe(rowId, rowData.getLong(ordinal));
+        return;
+      case FLOAT:
+        ((Float4Vector) vector).setSafe(rowId, rowData.getFloat(ordinal));
+        return;
+      case DOUBLE:
+        ((Float8Vector) vector).setSafe(rowId, rowData.getDouble(ordinal));
+        return;
+      case CHAR:
+      case VARCHAR:
+        ((VarCharVector) vector).setSafe(rowId, 
rowData.getString(ordinal).toBytes());
+        return;
+      case BINARY:
+      case VARBINARY:
+        ((VarBinaryVector) vector).setSafe(rowId, rowData.getBinary(ordinal));
+        return;
+      case DECIMAL:
+        DecimalType decimalType = (DecimalType) type;
+        DecimalData decimal = rowData.getDecimal(ordinal, 
decimalType.getPrecision(), decimalType.getScale());
+        ((DecimalVector) vector).setSafe(rowId, decimal.toBigDecimal());
+        return;
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+        TimestampData timestamp = rowData.getTimestamp(ordinal, 
getPrecision(type));
+        long micros = timestampToMicros(timestamp, getPrecision(type), 
utcTimestamp);
+        ((TimeStampMicroVector) vector).setSafe(rowId, micros);
+        return;
+      default:
+        throw unsupported(type);
+    }
+  }
+
+  private static Object readValue(LogicalType type, ValueVector vector, int 
rowId) {
+    switch (type.getTypeRoot()) {
+      case BOOLEAN:
+        return ((BitVector) vector).get(rowId) == 1;
+      case TINYINT:
+        return ((TinyIntVector) vector).get(rowId);
+      case SMALLINT:
+        return ((SmallIntVector) vector).get(rowId);
+      case INTEGER:
+        return ((IntVector) vector).get(rowId);
+      case DATE:
+        return ((DateDayVector) vector).get(rowId);
+      case TIME_WITHOUT_TIME_ZONE:
+        return ((TimeMilliVector) vector).get(rowId);
+      case BIGINT:
+        return ((BigIntVector) vector).get(rowId);
+      case FLOAT:
+        return ((Float4Vector) vector).get(rowId);
+      case DOUBLE:
+        return ((Float8Vector) vector).get(rowId);
+      case CHAR:
+      case VARCHAR:
+        return StringData.fromBytes(((VarCharVector) vector).get(rowId));
+      case BINARY:
+      case VARBINARY:
+        return ((VarBinaryVector) vector).get(rowId);
+      case DECIMAL:
+        DecimalType decimalType = (DecimalType) type;
+        BigDecimal decimal = ((DecimalVector) vector).getObject(rowId);
+        return DecimalData.fromBigDecimal(decimal, decimalType.getPrecision(), 
decimalType.getScale());
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+        long micros = ((TimeStampMicroVector) vector).get(rowId);
+        return TimestampData.fromEpochMillis(Math.floorDiv(micros, 1000L), 
(int) Math.floorMod(micros, 1000L) * 1000);
+      default:
+        throw unsupported(type);
+    }
+  }
+
+  private static Field toArrowField(String name, LogicalType type) {
+    return new Field(name, FieldType.nullable(toArrowType(type)), 
Collections.emptyList());
+  }
+
+  private static ArrowType toArrowType(LogicalType type) {
+    switch (type.getTypeRoot()) {
+      case BOOLEAN:
+        return ArrowType.Bool.INSTANCE;
+      case TINYINT:
+        return new ArrowType.Int(8, true);
+      case SMALLINT:
+        return new ArrowType.Int(16, true);
+      case INTEGER:
+        return new ArrowType.Int(32, true);
+      case BIGINT:
+        return new ArrowType.Int(64, true);
+      case FLOAT:
+        return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE);
+      case DOUBLE:
+        return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
+      case CHAR:
+      case VARCHAR:
+        return ArrowType.Utf8.INSTANCE;
+      case BINARY:
+      case VARBINARY:
+        return ArrowType.Binary.INSTANCE;
+      case DATE:
+        return new ArrowType.Date(DateUnit.DAY);
+      case TIME_WITHOUT_TIME_ZONE:
+        return new ArrowType.Time(TimeUnit.MILLISECOND, 32);
+      case DECIMAL:
+        DecimalType decimalType = (DecimalType) type;
+        return new ArrowType.Decimal(decimalType.getPrecision(), 
decimalType.getScale(), 128);
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+        return new ArrowType.Timestamp(TimeUnit.MICROSECOND, null);
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+        return new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC");
+      default:
+        throw unsupported(type);
+    }
+  }
+
+  private static LogicalType toLogicalType(ArrowType arrowType) {
+    if (arrowType instanceof ArrowType.Bool) {
+      return new BooleanType();
+    } else if (arrowType instanceof ArrowType.Int) {
+      ArrowType.Int intType = (ArrowType.Int) arrowType;
+      switch (intType.getBitWidth()) {
+        case 8:
+          return new TinyIntType();
+        case 16:
+          return new SmallIntType();
+        case 32:
+          return new IntType();
+        case 64:
+          return new BigIntType();
+        default:
+          throw new HoodieNotSupportedException("Unsupported Arrow int width 
for Lance Flink reader: " + intType.getBitWidth());
+      }
+    } else if (arrowType instanceof ArrowType.FloatingPoint) {
+      ArrowType.FloatingPoint fp = (ArrowType.FloatingPoint) arrowType;
+      return fp.getPrecision() == FloatingPointPrecision.SINGLE
+          ? new FloatType()
+          : new DoubleType();
+    } else if (arrowType instanceof ArrowType.Utf8) {
+      return new VarCharType();
+    } else if (arrowType instanceof ArrowType.Binary) {
+      return new VarBinaryType();
+    } else if (arrowType instanceof ArrowType.Date) {
+      return new DateType();
+    } else if (arrowType instanceof ArrowType.Time) {
+      return new TimeType();
+    } else if (arrowType instanceof ArrowType.Decimal) {
+      ArrowType.Decimal decimal = (ArrowType.Decimal) arrowType;
+      return new DecimalType(decimal.getPrecision(), decimal.getScale());
+    } else if (arrowType instanceof ArrowType.Timestamp) {
+      ArrowType.Timestamp timestamp = (ArrowType.Timestamp) arrowType;
+      return timestamp.getTimezone() == null
+          ? new TimestampType(6)
+          : new LocalZonedTimestampType(6);
+    }
+    throw new HoodieNotSupportedException("Unsupported Arrow type for Lance 
Flink reader: " + arrowType);
+  }
+
+  private static long timestampToMicros(TimestampData timestampData, int 
precision, boolean utcTimestamp) {
+    long millis = utcTimestamp ? timestampData.getMillisecond() : 
timestampData.toTimestamp().getTime();
+    return precision > 3 && utcTimestamp
+        ? millis * 1000L + timestampData.getNanoOfMillisecond() / 1000L
+        : millis * 1000L;
+  }
+
+  private static HoodieNotSupportedException unsupported(LogicalType type) {
+    return new HoodieNotSupportedException("Flink Lance base-file support 
currently supports primitive append-only columns; unsupported type: " + type);
+  }
+}
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
index 45333cf4b5db..dbccd3975012 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
@@ -294,6 +294,6 @@ public class HoodieRowDataCreateHandle implements 
Serializable {
       throws IOException {
     StoragePath storagePath = new StoragePath(path.toUri());
     return (HoodieRowDataFileWriter) new 
HoodieRowDataFileWriterFactory(hoodieTable.getStorage())
-        .newParquetFileWriter(instantTime, storagePath, config, rowType, 
hoodieTable.getTaskContextSupplier());
+        .getFileWriter(instantTime, storagePath, config, rowType, 
hoodieTable.getTaskContextSupplier());
   }
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
index be3242164a40..49671d37f108 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
@@ -23,8 +23,10 @@ import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieParquetConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -35,6 +37,7 @@ import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.RowDataQueryContexts;
 
 import org.apache.flink.table.types.logical.RowType;
@@ -44,6 +47,10 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 
+import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
+import static org.apache.hudi.common.model.HoodieFileFormat.LANCE;
+import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
+import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
 import static org.apache.hudi.common.util.ParquetUtils.getCompressionCodecName;
 
 /**
@@ -55,6 +62,30 @@ public class HoodieRowDataFileWriterFactory extends 
HoodieFileWriterFactory {
     super(storage);
   }
 
+  public HoodieFileWriter getFileWriter(String instantTime, StoragePath 
storagePath, HoodieWriteConfig config, RowType rowType,
+                                     TaskContextSupplier taskContextSupplier) 
throws IOException {
+    final String extension = FSUtils.getFileExtension(storagePath.getName());
+    return getFileWriterByFormat(extension, instantTime, storagePath, config, 
rowType, taskContextSupplier);
+  }
+
+  private  <T, I, K, O> HoodieFileWriter getFileWriterByFormat(
+      String extension, String instantTime, StoragePath path, HoodieConfig 
config, RowType rowType,
+      TaskContextSupplier taskContextSupplier) throws IOException {
+    if (PARQUET.getFileExtension().equals(extension)) {
+      return newParquetFileWriter(instantTime, path, config, rowType, 
taskContextSupplier);
+    }
+    if (HFILE.getFileExtension().equals(extension)) {
+      return newHFileFileWriter(instantTime, path, config, 
HoodieSchemaConverter.convertToSchema(rowType), taskContextSupplier);
+    }
+    if (ORC.getFileExtension().equals(extension)) {
+      return newOrcFileWriter(instantTime, path, config, 
HoodieSchemaConverter.convertToSchema(rowType), taskContextSupplier);
+    }
+    if (LANCE.getFileExtension().equals(extension)) {
+      return newLanceFileWriter(instantTime, path, config, rowType, 
taskContextSupplier);
+    }
+    throw new UnsupportedOperationException(extension + " format not supported 
yet.");
+  }
+
   /**
    * Create a parquet writer on a given OutputStream.
    *
@@ -136,6 +167,30 @@ public class HoodieRowDataFileWriterFactory extends 
HoodieFileWriterFactory {
         instantTime, taskContextSupplier, populateMetaFields, withOperation);
   }
 
+  public HoodieFileWriter newLanceFileWriter(
+      String instantTime,
+      StoragePath path,
+      HoodieConfig config,
+      RowType rowType,
+      TaskContextSupplier taskContextSupplier) {
+    boolean populateMetaFields = 
config.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS);
+    boolean withOperation = 
config.getBooleanOrDefault(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD);
+    Option<org.apache.hudi.common.bloom.BloomFilter> bloomFilter = 
enableBloomFilter(populateMetaFields, config)
+        ? Option.of(createBloomFilter(config)) : Option.empty();
+    return new HoodieRowDataLanceWriter(
+        path,
+        rowType,
+        instantTime,
+        taskContextSupplier,
+        bloomFilter,
+        config.getLongOrDefault(HoodieStorageConfig.LANCE_MAX_FILE_SIZE),
+        
config.getLongOrDefault(HoodieStorageConfig.LANCE_WRITE_ALLOCATOR_SIZE_BYTES),
+        
config.getLongOrDefault(HoodieStorageConfig.LANCE_WRITE_FLUSH_BYTE_WATERMARK),
+        config.getBooleanOrDefault(HoodieStorageConfig.WRITE_UTC_TIMEZONE),
+        populateMetaFields,
+        withOperation);
+  }
+
   private static HoodieParquetConfig<HoodieRowDataParquetWriteSupport> 
getParquetConfig(
       HoodieConfig config, HoodieRowDataParquetWriteSupport writeSupport) {
     return new HoodieParquetConfig<>(
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataLanceWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataLanceWriter.java
new file mode 100644
index 000000000000..f4388c23fc43
--- /dev/null
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataLanceWriter.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.client.model.HoodieRowDataCreation;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.io.lance.HoodieBaseLanceWriter;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+/**
+ * Lance writer for Flink {@link RowData} append-only base files.
+ */
+public class HoodieRowDataLanceWriter extends HoodieBaseLanceWriter<RowData, 
String>
+    implements HoodieRowDataFileWriter {
+
+  private static final long MIN_RECORDS_FOR_SIZE_CHECK = 100L;
+  private static final long MAX_RECORDS_FOR_SIZE_CHECK = 10000L;
+
+  private final RowType rowType;
+  private final Schema arrowSchema;
+  private final String fileName;
+  private final String instantTime;
+  private final long maxFileSize;
+  private final boolean utcTimestamp;
+  private final boolean populateMetaFields;
+  private final boolean withOperation;
+  private final Function<Long, String> seqIdGenerator;
+  private long recordCountForNextSizeCheck = MIN_RECORDS_FOR_SIZE_CHECK;
+
+  public HoodieRowDataLanceWriter(
+      StoragePath file,
+      RowType rowType,
+      String instantTime,
+      TaskContextSupplier taskContextSupplier,
+      Option<BloomFilter> bloomFilterOpt,
+      long maxFileSize,
+      long allocatorSize,
+      long flushByteWatermark,
+      boolean utcTimestamp,
+      boolean populateMetaFields,
+      boolean withOperation) {
+    super(file, DEFAULT_BATCH_SIZE, allocatorSize, flushByteWatermark,
+        bloomFilterOpt.map(HoodieBloomFilterRowDataWriteSupport::new));
+    ValidationUtils.checkArgument(maxFileSize > 0, "maxFileSize must be a 
positive number");
+    ValidationUtils.checkArgument(allocatorSize > 0, "allocatorSize must be a 
positive number");
+    ValidationUtils.checkArgument(flushByteWatermark > 0, "flushByteWatermark 
must be a positive number");
+    ValidationUtils.checkArgument(flushByteWatermark < allocatorSize,
+        "flushByteWatermark (" + flushByteWatermark + ") must be less than 
allocatorSize ("
+            + allocatorSize + ")");
+    this.rowType = rowType;
+    this.arrowSchema = HoodieFlinkLanceArrowUtils.toArrowSchema(rowType);
+    this.fileName = file.getName();
+    this.instantTime = instantTime;
+    this.maxFileSize = maxFileSize;
+    this.utcTimestamp = utcTimestamp;
+    this.populateMetaFields = populateMetaFields;
+    this.withOperation = withOperation;
+    this.seqIdGenerator = recordIndex -> {
+      Integer partitionId = taskContextSupplier.getPartitionIdSupplier().get();
+      return HoodieRecord.generateSequenceId(instantTime, partitionId, 
recordIndex);
+    };
+  }
+
+  @Override
+  public boolean canWrite() {
+    long writtenCount = getWrittenRecordCount();
+    if (writtenCount >= recordCountForNextSizeCheck) {
+      long dataSize = getDataSize();
+      long avgRecordSize = Math.max(dataSize / writtenCount, 1);
+      if (dataSize > (maxFileSize - avgRecordSize * 2)) {
+        return false;
+      }
+      recordCountForNextSizeCheck = writtenCount + Math.min(
+          Math.max(MIN_RECORDS_FOR_SIZE_CHECK, (maxFileSize / avgRecordSize - 
writtenCount) / 2),
+          MAX_RECORDS_FOR_SIZE_CHECK);
+    }
+    return true;
+  }
+
+  @Override
+  public void writeRow(String key, RowData row) throws IOException {
+    bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport -> 
bloomFilterWriteSupport.addKey(key));
+    super.write(row);
+  }
+
+  @Override
+  public void writeRowWithMetaData(HoodieKey key, RowData row) throws 
IOException {
+    if (populateMetaFields) {
+      RowData rowWithMeta = updateRecordMetadata(row, key, 
getWrittenRecordCount());
+      writeRow(key.getRecordKey(), rowWithMeta);
+    } else {
+      writeRow(key.getRecordKey(), row);
+    }
+  }
+
+  @Override
+  protected ArrowWriter<RowData> createArrowWriter(VectorSchemaRoot root) {
+    return new RowDataArrowWriter(root);
+  }
+
+  @Override
+  protected Schema getArrowSchema() {
+    return arrowSchema;
+  }
+
+  private class RowDataArrowWriter implements ArrowWriter<RowData> {
+    private final VectorSchemaRoot root;
+    private int rowId;
+
+    private RowDataArrowWriter(VectorSchemaRoot root) {
+      this.root = root;
+    }
+
+    @Override
+    public void write(RowData row) {
+      for (int i = 0; i < rowType.getFieldCount(); i++) {
+        HoodieFlinkLanceArrowUtils.writeValue(rowType.getTypeAt(i), 
root.getVector(i), rowId, row, i, utcTimestamp);
+      }
+      rowId++;
+    }
+
+    @Override
+    public void finishBatch() {
+      root.getFieldVectors().forEach(vector -> vector.setValueCount(rowId));
+      root.setRowCount(rowId);
+    }
+
+    @Override
+    public void reset() {
+      rowId = 0;
+    }
+  }
+
+  private RowData updateRecordMetadata(RowData row, HoodieKey key, long 
recordCount) {
+    return HoodieRowDataCreation.create(instantTime, 
seqIdGenerator.apply(recordCount),
+        key.getRecordKey(), key.getPartitionPath(), fileName, row, 
withOperation, true);
+  }
+}
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
index b2b1d9c058cf..8dfb06872e9c 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
@@ -21,7 +21,6 @@ package org.apache.hudi.io.storage.row;
 import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
@@ -61,15 +60,4 @@ public class HoodieRowDataParquetWriteSupport extends 
RowDataParquetWriteSupport
     this.bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport ->
         bloomFilterWriteSupport.addKey(recordKey));
   }
-
-  private static class HoodieBloomFilterRowDataWriteSupport extends 
HoodieBloomFilterWriteSupport<String> {
-    public HoodieBloomFilterRowDataWriteSupport(BloomFilter bloomFilter) {
-      super(bloomFilter);
-    }
-
-    @Override
-    protected byte[] getUTF8Bytes(String key) {
-      return StringUtils.getUTF8Bytes(key);
-    }
-  }
 }
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieFlinkLanceArrowUtils.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieFlinkLanceArrowUtils.java
new file mode 100644
index 000000000000..2a9ecc5b0145
--- /dev/null
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieFlinkLanceArrowUtils.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+
+/**
+ * Tests for {@link HoodieFlinkLanceArrowUtils}.
+ */
+public class TestHoodieFlinkLanceArrowUtils {
+
+  @Test
+  public void testTimestampSchemaRoundTripPreservesLocalTimezone() {
+    RowType rowType = RowType.of(
+        new LogicalType[] {new TimestampType(6), new 
LocalZonedTimestampType(6)},
+        new String[] {"timestamp", "local_timestamp"});
+
+    RowType roundTripped = HoodieFlinkLanceArrowUtils.toRowType(
+        HoodieFlinkLanceArrowUtils.toArrowSchema(rowType));
+
+    assertInstanceOf(TimestampType.class, roundTripped.getTypeAt(0));
+    assertInstanceOf(LocalZonedTimestampType.class, roundTripped.getTypeAt(1));
+  }
+
+  @Test
+  public void testTimestampWriteHonorsUtcTimestampFlag() {
+    TimestampData timestampData = TimestampData.fromEpochMillis(1234L, 567000);
+    GenericRowData rowData = GenericRowData.of(timestampData);
+
+    try (BufferAllocator allocator = new RootAllocator();
+         TimeStampMicroVector vector = new TimeStampMicroVector(
+             "ts",
+             FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MICROSECOND, 
null)),
+             allocator)) {
+      HoodieFlinkLanceArrowUtils.writeValue(new TimestampType(6), vector, 0, 
rowData, 0, true);
+      assertEquals(1234567L, vector.get(0));
+
+      HoodieFlinkLanceArrowUtils.writeValue(new TimestampType(6), vector, 1, 
rowData, 0, false);
+      assertEquals(timestampData.toTimestamp().getTime() * 1000L, 
vector.get(1));
+    }
+  }
+
+  @Test
+  public void testTimestampReadNormalizesPreEpochMicros() {
+    try (BufferAllocator allocator = new RootAllocator();
+         TimeStampMicroVector vector = new TimeStampMicroVector(
+             "ts",
+             FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MICROSECOND, 
null)),
+             allocator)) {
+      vector.setSafe(0, -1_234_567L);
+      vector.setValueCount(1);
+
+      RowData rowData = HoodieFlinkLanceArrowUtils.toRowData(
+          RowType.of(new LogicalType[] {new TimestampType(6)}, new String[] 
{"ts"}),
+          Collections.singletonList(vector),
+          0);
+
+      assertEquals(TimestampData.fromEpochMillis(-1235L, 433000), 
rowData.getTimestamp(0, 6));
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 2f2ada47731a..ae8dbb1e963b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -86,9 +87,9 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
     StoragePath path = new 
StoragePath(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
         new ValidationException("Option [path] should not be empty.")));
     setupTableOptions(conf.get(FlinkOptions.PATH), conf);
-    checkBaseFileFormat(conf);
     ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
     setupConfOptions(conf, context.getObjectIdentifier(), 
context.getCatalogTable(), schema);
+    checkBaseFileFormatForRead(conf, schema);
     return new HoodieTableSource(
         SerializableSchema.create(schema),
         path,
@@ -116,11 +117,6 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
   private void setupTableOptions(String basePath, Configuration conf) {
     StreamerUtil.getTableConfig(basePath, 
HadoopConfigurations.getHadoopConf(conf))
         .ifPresent(tableConfig -> {
-          // Guard: reject Lance from existing table config 
(hoodie.properties); checkBaseFileFormat() handles user-supplied config 
separately
-          if (tableConfig.contains(HoodieTableConfig.BASE_FILE_FORMAT)
-              && 
HoodieFileFormat.LANCE.name().equalsIgnoreCase(tableConfig.getString(HoodieTableConfig.BASE_FILE_FORMAT)))
 {
-            throw new 
HoodieValidationException(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG);
-          }
           if (tableConfig.contains(HoodieTableConfig.RECORDKEY_FIELDS)
               && !conf.contains(FlinkOptions.RECORD_KEY_FIELD)) {
             conf.set(FlinkOptions.RECORD_KEY_FIELD, 
tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS));
@@ -177,8 +173,8 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
    * @param schema The table schema
    */
   private void sanityCheck(Configuration conf, ResolvedSchema schema) {
-    checkBaseFileFormat(conf);
     checkTableType(conf);
+    checkBaseFileFormatForWrite(conf, schema);
     checkIndexType(conf);
 
     if (!OptionsResolver.isAppendMode(conf)) {
@@ -220,15 +216,41 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
   }
 
   /**
-   * Validate the base file format. Lance is only supported with the Spark 
engine.
+   * Validate the base file format. Flink Lance support is scoped to 
append-only COW tables.
    */
-  private void checkBaseFileFormat(Configuration conf) {
-    String baseFileFormat = 
conf.getString(HoodieTableConfig.BASE_FILE_FORMAT.key(), null);
-    if (baseFileFormat != null && 
HoodieFileFormat.LANCE.name().equalsIgnoreCase(baseFileFormat)) {
-      throw new 
HoodieValidationException(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG);
+  private void checkBaseFileFormatForRead(Configuration conf, ResolvedSchema 
schema) {
+    checkLanceBaseFileFormat(conf, schema);
+  }
+
+  private void checkBaseFileFormatForWrite(Configuration conf, ResolvedSchema 
schema) {
+    checkLanceBaseFileFormat(conf, schema);
+    if (isLanceBaseFileFormat(conf) && !OptionsResolver.isAppendMode(conf)) {
+      throw new HoodieValidationException("Flink Lance base-file writes 
require append-only INSERT mode. Set '"
+          + FlinkOptions.OPERATION.key() + "' = 'insert'.");
+    }
+  }
+
+  private void checkLanceBaseFileFormat(Configuration conf, ResolvedSchema 
schema) {
+    if (!isLanceBaseFileFormat(conf)) {
+      return;
+    }
+    if (conf.containsKey(FlinkOptions.RECORD_KEY_FIELD.key()) || 
schema.getPrimaryKey().isPresent()) {
+      throw new HoodieValidationException("Flink Lance base-file support is 
only available for append-only tables without primary keys.");
+    }
+    if (OptionsResolver.isMorTable(conf)) {
+      throw new HoodieValidationException("Flink Lance base-file support is 
only available for COPY_ON_WRITE append-only tables.");
+    }
+    if (OptionsResolver.isSchemaEvolutionEnabled(conf)) {
+      throw new HoodieValidationException("Flink Lance base-file support does 
not support schema evolution. Set '"
+          + HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key() + "' = 'false'.");
     }
   }
 
+  private boolean isLanceBaseFileFormat(Configuration conf) {
+    String baseFileFormat = 
conf.getString(HoodieTableConfig.BASE_FILE_FORMAT.key(), null);
+    return baseFileFormat != null && 
HoodieFileFormat.LANCE.name().equalsIgnoreCase(baseFileFormat);
+  }
+
   /**
    * Validate the table type.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index 2015244e24ce..130f94e59bf1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -39,6 +39,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.source.ExpressionPredicates;
@@ -98,18 +99,30 @@ public class FlinkRowDataReaderContext extends 
HoodieReaderContext<RowData> {
       HoodieSchema dataSchema,
       HoodieSchema requiredSchema,
       HoodieStorage storage) throws IOException {
-    if 
(filePath.toString().endsWith(HoodieFileFormat.LANCE.getFileExtension())) {
-      throw new 
UnsupportedOperationException(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG);
-    }
     boolean isLogFile = FSUtils.isLogFile(filePath);
     // disable schema evolution in fileReader if it's log file, since schema 
evolution for log file is handled in `FileGroupRecordBuffer`
     InternalSchemaManager schemaManager = isLogFile ? 
InternalSchemaManager.DISABLED : internalSchemaManager.get();
 
+    if 
(filePath.getName().endsWith(HoodieFileFormat.LANCE.getFileExtension())) {
+      if (schemaManager != InternalSchemaManager.DISABLED) {
+        throw new HoodieValidationException("Flink Lance base-file support 
does not support schema evolution.");
+      }
+      HoodieRowDataLanceReader rowDataLanceReader =
+          (HoodieRowDataLanceReader) HoodieIOFactory.getIOFactory(storage)
+              .getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
+              .getFileReader(tableConfig, filePath, HoodieFileFormat.LANCE, 
Option.empty());
+      try {
+        return 
rowDataLanceReader.getRowDataIterator(RowDataQueryContexts.fromSchema(requiredSchema).getRowType(),
 requiredSchema);
+      } catch (RuntimeException e) {
+        rowDataLanceReader.close();
+        throw new HoodieException("Failed to get iterator from lance reader", 
e);
+      }
+    }
+    DataType rowType = 
RowDataQueryContexts.fromSchema(dataSchema).getRowType();
     HoodieRowDataParquetReader rowDataParquetReader =
         (HoodieRowDataParquetReader) HoodieIOFactory.getIOFactory(storage)
             .getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
             .getFileReader(tableConfig, filePath, HoodieFileFormat.PARQUET, 
Option.empty());
-    DataType rowType = 
RowDataQueryContexts.fromSchema(dataSchema).getRowType();
     return rowDataParquetReader.getRowDataIterator(schemaManager, rowType, 
requiredSchema, getSafePredicates(requiredSchema));
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReaderFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReaderFactory.java
index 2e7401d4caa0..3ad64bd3972a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReaderFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReaderFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.format;
 
+import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.storage.HoodieStorage;
@@ -35,4 +36,9 @@ public class HoodieRowDataFileReaderFactory extends 
HoodieFileReaderFactory {
   protected HoodieFileReader newParquetFileReader(StoragePath path) {
     return new HoodieRowDataParquetReader(storage, path);
   }
+
+  @Override
+  protected HoodieFileReader newLanceFileReader(HoodieConfig hoodieConfig, 
StoragePath path) {
+    return new HoodieRowDataLanceReader(path, hoodieConfig);
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataLanceReader.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataLanceReader.java
new file mode 100644
index 000000000000..8c7564af6730
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataLanceReader.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format;
+
+import org.apache.hudi.client.model.HoodieFlinkRecord;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.bloom.SimpleBloomFilter;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.memory.HoodieArrowAllocator;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.row.HoodieFlinkLanceArrowUtils;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.util.HoodieSchemaConverter;
+import org.apache.hudi.util.RowDataQueryContexts;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.lance.file.LanceFileReader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hudi.avro.HoodieBloomFilterWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
+import static 
org.apache.hudi.avro.HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
+import static 
org.apache.hudi.avro.HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
+import static 
org.apache.hudi.avro.HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
+
+/**
+ * Lance reader for Flink RowData base files.
+ */
+public class HoodieRowDataLanceReader implements HoodieFileReader<RowData> {
+
+  private static final int DEFAULT_BATCH_SIZE = 512;
+
+  private final StoragePath path;
+  private final long dataAllocatorSize;
+  private final BufferAllocator metadataAllocator;
+  private final LanceFileReader metadataReader;
+  private final Schema arrowSchema;
+  private boolean closed;
+
+  public HoodieRowDataLanceReader(StoragePath path, HoodieConfig hoodieConfig) 
{
+    this.path = path;
+    this.dataAllocatorSize = 
hoodieConfig.getLongOrDefault(HoodieStorageConfig.LANCE_READ_ALLOCATOR_SIZE_BYTES);
+    this.metadataAllocator = HoodieArrowAllocator.newChildAllocator(
+        getClass().getSimpleName() + "-metadata-" + path.getName(),
+        
hoodieConfig.getLongOrDefault(HoodieStorageConfig.LANCE_READ_METADATA_ALLOCATOR_SIZE_BYTES));
+    try {
+      this.metadataReader = LanceFileReader.open(path.toString(), 
metadataAllocator);
+      this.arrowSchema = metadataReader.schema();
+    } catch (Exception e) {
+      close();
+      throw new HoodieException("Failed to create Lance reader for: " + path, 
e);
+    }
+  }
+
+  @Override
+  public String[] readMinMaxRecordKeys() {
+    Map<String, String> metadata = arrowSchema.getCustomMetadata();
+    if (metadata != null) {
+      String minKey = metadata.get(HOODIE_MIN_RECORD_KEY_FOOTER);
+      String maxKey = metadata.get(HOODIE_MAX_RECORD_KEY_FOOTER);
+      if (minKey != null && maxKey != null) {
+        return new String[] {minKey, maxKey};
+      }
+    }
+    throw new HoodieException("Could not read min/max record key out of Lance 
file: " + path);
+  }
+
+  @Override
+  public BloomFilter readBloomFilter() {
+    Map<String, String> metadata = arrowSchema.getCustomMetadata();
+    if (metadata == null || 
!metadata.containsKey(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY)) {
+      return null;
+    }
+    String bloomSer = metadata.get(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
+    String filterType = metadata.get(HOODIE_BLOOM_FILTER_TYPE_CODE);
+    if (filterType != null && 
filterType.contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
+      return new HoodieDynamicBoundedBloomFilter(bloomSer);
+    }
+    return new SimpleBloomFilter(bloomSer);
+  }
+
+  @Override
+  public Set<Pair<String, Long>> filterRowKeys(Set<String> candidateRowKeys) {
+    throw new HoodieException("Filtering row keys from Lance files is not 
supported for Flink append-only tables without primary keys: " + path);
+  }
+
+  @Override
+  public ClosableIterator<HoodieRecord<RowData>> 
getRecordIterator(HoodieSchema readerSchema, HoodieSchema requestedSchema) 
throws IOException {
+    ClosableIterator<RowData> rowDataItr = 
getRowDataIterator(RowDataQueryContexts.fromSchema(requestedSchema).getRowType(),
 requestedSchema);
+    return new CloseableMappingIterator<>(rowDataItr, HoodieFlinkRecord::new);
+  }
+
+  @Override
+  public ClosableIterator<String> getRecordKeyIterator() throws IOException {
+    HoodieSchema schema = HoodieSchemaUtils.getRecordKeySchema();
+    ClosableIterator<RowData> rowDataItr = 
getRowDataIterator(RowDataQueryContexts.fromSchema(schema).getRowType(), 
schema);
+    return new CloseableMappingIterator<>(rowDataItr, rowData -> 
rowData.getString(0).toString());
+  }
+
+  public ClosableIterator<RowData> getRowDataIterator(DataType dataType, 
HoodieSchema requestedSchema) {
+    RowType rowType = (RowType) dataType.getLogicalType();
+    List<String> columnNames = new ArrayList<>(rowType.getFieldCount());
+    for (RowType.RowField field : rowType.getFields()) {
+      columnNames.add(field.getName());
+    }
+    BufferAllocator allocator = HoodieArrowAllocator.newChildAllocator(
+        getClass().getSimpleName() + "-data-" + path.getName(), 
dataAllocatorSize);
+    LanceFileReader lanceReader = null;
+    ArrowReader arrowReader = null;
+    try {
+      lanceReader = LanceFileReader.open(path.toString(), allocator);
+      arrowReader = lanceReader.readAll(columnNames, null, DEFAULT_BATCH_SIZE);
+      return new LanceRowDataIterator(allocator, lanceReader, arrowReader, 
rowType, this);
+    } catch (Exception e) {
+      if (arrowReader != null) {
+        try {
+          arrowReader.close();
+        } catch (Exception closeException) {
+          e.addSuppressed(closeException);
+        }
+      }
+      if (lanceReader != null) {
+        try {
+          lanceReader.close();
+        } catch (Exception closeException) {
+          e.addSuppressed(closeException);
+        }
+      }
+      allocator.close();
+      throw new HoodieException("Failed to create Lance row iterator for: " + 
path, e);
+    }
+  }
+
+  @Override
+  public HoodieSchema getSchema() {
+    RowType rowType = HoodieFlinkLanceArrowUtils.toRowType(arrowSchema);
+    return HoodieSchemaConverter.convertToSchema(rowType);
+  }
+
+  @Override
+  public void close() {
+    if (closed) {
+      return;
+    }
+    closed = true;
+    if (metadataReader != null) {
+      try {
+        metadataReader.close();
+      } catch (Exception e) {
+        // ignore close failure; readers surface data-path exceptions earlier
+      }
+    }
+    if (metadataAllocator != null) {
+      metadataAllocator.close();
+    }
+  }
+
+  @Override
+  public long getTotalRecords() {
+    try {
+      return metadataReader.numRows();
+    } catch (Exception e) {
+      throw new HoodieException("Failed to read row count from Lance file: " + 
path, e);
+    }
+  }
+
+  private static class LanceRowDataIterator implements 
ClosableIterator<RowData> {
+    private final BufferAllocator allocator;
+    private final LanceFileReader lanceReader;
+    private final ArrowReader arrowReader;
+    private final RowType rowType;
+    private final HoodieRowDataLanceReader reader;
+    private VectorSchemaRoot batch;
+    private List<FieldVector> orderedVectors;
+    private int rowId;
+    private boolean hasNext;
+    private boolean closed;
+
+    private LanceRowDataIterator(
+        BufferAllocator allocator,
+        LanceFileReader lanceReader,
+        ArrowReader arrowReader,
+        RowType rowType,
+        HoodieRowDataLanceReader reader) {
+      this.allocator = allocator;
+      this.lanceReader = lanceReader;
+      this.arrowReader = arrowReader;
+      this.rowType = rowType;
+      this.reader = reader;
+      loadNextBatch();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return hasNext;
+    }
+
+    @Override
+    public RowData next() {
+      RowData rowData = HoodieFlinkLanceArrowUtils.toRowData(rowType, 
orderedVectors, rowId++);
+      if (rowId >= batch.getRowCount()) {
+        loadNextBatch();
+      }
+      return rowData;
+    }
+
+    private void loadNextBatch() {
+      try {
+        do {
+          hasNext = arrowReader.loadNextBatch();
+          if (hasNext) {
+            batch = arrowReader.getVectorSchemaRoot();
+            orderedVectors = orderVectors(rowType, batch.getFieldVectors());
+            rowId = 0;
+          }
+        } while (hasNext && batch.getRowCount() == 0);
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to read Lance batch", e);
+      }
+    }
+
+    private static List<FieldVector> orderVectors(RowType rowType, 
List<FieldVector> vectors) {
+      Map<String, FieldVector> vectorsByName = new HashMap<>();
+      for (FieldVector vector : vectors) {
+        vectorsByName.put(vector.getName(), vector);
+      }
+      List<FieldVector> orderedVectors = new 
ArrayList<>(rowType.getFieldCount());
+      for (RowType.RowField field : rowType.getFields()) {
+        FieldVector vector = vectorsByName.get(field.getName());
+        if (vector == null) {
+          throw new HoodieException("Missing Lance column in projected batch: 
" + field.getName());
+        }
+        orderedVectors.add(vector);
+      }
+      return orderedVectors;
+    }
+
+    @Override
+    public void close() {
+      if (closed) {
+        return;
+      }
+      closed = true;
+      try {
+        arrowReader.close();
+      } catch (Exception e) {
+        throw new HoodieException("Failed to close Lance Arrow reader", e);
+      } finally {
+        try {
+          lanceReader.close();
+        } catch (Exception e) {
+          throw new HoodieException("Failed to close Lance reader", e);
+        } finally {
+          try {
+            allocator.close();
+          } finally {
+            reader.close();
+          }
+        }
+      }
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
index 5a2e94b833ba..9e59e1ed3344 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
@@ -18,12 +18,19 @@
 
 package org.apache.hudi.table.format.cow;
 
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.source.ExpressionPredicates.Predicate;
+import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.format.FilePathUtils;
+import org.apache.hudi.table.format.HoodieRowDataLanceReader;
 import org.apache.hudi.table.format.InternalSchemaManager;
 import org.apache.hudi.table.format.RecordIterators;
+import org.apache.hudi.util.HoodieSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.io.FileInputFormat;
@@ -33,6 +40,7 @@ import 
org.apache.flink.api.common.io.compression.InflaterInputStreamFactory;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.parquet.utils.SerializableConfiguration;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.hadoop.conf.Configuration;
@@ -116,32 +124,50 @@ public class CopyOnWriteInputFormat extends 
FileInputFormat<RowData> {
 
   @Override
   public void open(FileInputSplit fileSplit) throws IOException {
-    LinkedHashMap<String, Object> partObjects = 
FilePathUtils.generatePartitionSpecs(
-        fileSplit.getPath().getPath(),
-        Arrays.asList(fullFieldNames),
-        Arrays.asList(fullFieldTypes),
-        this.partDefaultName,
-        this.partPathField,
-        this.hiveStylePartitioning
-    );
-
-    this.itr = RecordIterators.getParquetRecordIterator(
-        internalSchemaManager,
-        utcTimestamp,
-        true,
-        conf.conf(),
-        fullFieldNames,
-        fullFieldTypes,
-        partObjects,
-        selectedFields,
-        2048,
-        fileSplit.getPath(),
-        fileSplit.getStart(),
-        fileSplit.getLength(),
-        predicates);
+    if 
(fileSplit.getPath().getName().endsWith(HoodieFileFormat.LANCE.getFileExtension()))
 {
+      this.itr = getLanceRecordIterator(fileSplit.getPath());
+    } else {
+      LinkedHashMap<String, Object> partObjects = 
FilePathUtils.generatePartitionSpecs(
+          fileSplit.getPath().getPath(),
+          Arrays.asList(fullFieldNames),
+          Arrays.asList(fullFieldTypes),
+          this.partDefaultName,
+          this.partPathField,
+          this.hiveStylePartitioning
+      );
+      this.itr = RecordIterators.getParquetRecordIterator(
+          internalSchemaManager,
+          utcTimestamp,
+          true,
+          conf.conf(),
+          fullFieldNames,
+          fullFieldTypes,
+          partObjects,
+          selectedFields,
+          2048,
+          fileSplit.getPath(),
+          fileSplit.getStart(),
+          fileSplit.getLength(),
+          predicates);
+    }
     this.currentReadCount = 0L;
   }
 
+  private ClosableIterator<RowData> getLanceRecordIterator(Path path) {
+    DataType selectedDataType = DataTypes.ROW(Arrays.stream(selectedFields)
+            .mapToObj(i -> DataTypes.FIELD(fullFieldNames[i], 
fullFieldTypes[i]))
+            .toArray(DataTypes.Field[]::new))
+        .bridgedTo(RowData.class);
+    HoodieSchema requestedSchema = 
HoodieSchemaConverter.convertToSchema(selectedDataType.getLogicalType());
+    HoodieRowDataLanceReader reader = new HoodieRowDataLanceReader(new 
StoragePath(path.toString()), StreamerUtil.getLanceReadConfig(conf.conf()));
+    try {
+      return reader.getRowDataIterator(selectedDataType, requestedSchema);
+    } catch (RuntimeException e) {
+      reader.close();
+      throw new HoodieException("Failed to get iterator from lance reader", e);
+    }
+  }
+
   @Override
   public FileInputSplit[] createInputSplits(int minNumSplits) throws 
IOException {
     if (minNumSplits < 1) {
@@ -379,6 +405,10 @@ public class CopyOnWriteInputFormat extends 
FileInputFormat<RowData> {
   }
 
   private boolean testForUnsplittable(FileStatus pathFile) {
+    if 
(pathFile.getPath().getName().endsWith(HoodieFileFormat.LANCE.getFileExtension()))
 {
+      unsplittable = true;
+      return true;
+    }
     if (getInflaterInputStreamFactory(pathFile.getPath()) != null) {
       unsplittable = true;
       return true;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index f88d1a76a325..c17e0528e458 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -25,7 +25,9 @@ import 
org.apache.hudi.client.model.EventTimeFlinkRecordMerger;
 import org.apache.hudi.client.model.PartialUpdateFlinkRecordMerger;
 import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
 import org.apache.hudi.common.config.DFSPropertiesConfiguration;
+import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
@@ -275,6 +277,22 @@ public class StreamerUtil {
     return properties;
   }
 
+  /**
+   * Builds a Lance read config from storage options carried in the Hadoop 
configuration.
+   */
+  public static HoodieConfig 
getLanceReadConfig(org.apache.hadoop.conf.Configuration conf) {
+    HoodieConfig hoodieConfig = new HoodieConfig();
+    String dataAllocatorSize = 
conf.get(HoodieStorageConfig.LANCE_READ_ALLOCATOR_SIZE_BYTES.key());
+    if (dataAllocatorSize != null) {
+      
hoodieConfig.setValue(HoodieStorageConfig.LANCE_READ_ALLOCATOR_SIZE_BYTES, 
dataAllocatorSize);
+    }
+    String metadataAllocatorSize = 
conf.get(HoodieStorageConfig.LANCE_READ_METADATA_ALLOCATOR_SIZE_BYTES.key());
+    if (metadataAllocatorSize != null) {
+      
hoodieConfig.setValue(HoodieStorageConfig.LANCE_READ_METADATA_ALLOCATOR_SIZE_BYTES,
 metadataAllocatorSize);
+    }
+    return hoodieConfig;
+  }
+
   public static void initTableFromClientIfNecessary(Configuration conf) {
     // Since Flink 2.0, the adaptive execution for batch job will generate job 
graph incrementally
     // for multiple stages (FLIP-469). And the write coordinator is 
initialized along with write
@@ -318,6 +336,7 @@ public class StreamerUtil {
           .setTableName(conf.get(FlinkOptions.TABLE_NAME))
           .setTableVersion(conf.get(FlinkOptions.WRITE_TABLE_VERSION))
           .setTableFormat(conf.get(FlinkOptions.WRITE_TABLE_FORMAT))
+          
.setBaseFileFormat(conf.getString(HoodieTableConfig.BASE_FILE_FORMAT.key(), 
null))
           .setRecordMergeMode(getMergeMode(conf))
           .setRecordMergeStrategyId(getMergeStrategyId(conf))
           .setPayloadClassName(getPayloadClass(conf))
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 5d6a5daa061e..1cc5beaad418 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -21,7 +21,6 @@ package org.apache.hudi.table;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
@@ -65,7 +64,6 @@ import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CollectionUtil;
-import org.apache.flink.util.ExceptionUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
@@ -1360,31 +1358,27 @@ public class ITTestHoodieDataSource {
   }
 
   @Test
-  void testLanceFormatRejectedByFlink() {
-    // Lance base file format is only supported with the Spark engine.
-    // Flink should reject it early with a clear error on both read and write 
paths.
-    String createLanceTable = sql("lance_t1")
+  void testLanceFormatAppendOnlyWriteAndRead() {
+    String createHoodieTable = sql("lance_t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
-        .options(getDefaultKeys())
+        .option(FlinkOptions.OPERATION, "insert")
         .option("hoodie.table.base.file.format", "LANCE")
         .end();
+    batchTableEnv.executeSql(createHoodieTable);
 
-    // Creating the table itself succeeds (DDL is just metadata registration),
-    // but any attempt to read or write should fail.
-    // Flink wraps our HoodieValidationException in its own 
ValidationException.
-    batchTableEnv.executeSql(createLanceTable);
+    execInsertSql(batchTableEnv, "insert into lance_t1 values "
+        + "('id1', 'Alice', 23, TIMESTAMP '1970-01-01 00:00:01', 'par1'),"
+        + "('id2', 'Bob', 31, TIMESTAMP '1970-01-01 00:00:02', 'par2')");
 
-    // Source (read) path should throw
-    ValidationException readEx = assertThrows(ValidationException.class,
-        () -> execSelectSql(batchTableEnv, "select * from lance_t1"),
-        "Lance format should be rejected when reading via Flink");
-    assertTrue(ExceptionUtils.findThrowableWithMessage(readEx, 
HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG).isPresent());
+    List<Row> rows = CollectionUtil.iteratorToList(
+        batchTableEnv.executeSql("select uuid, name, age, ts, `partition` from 
lance_t1").collect());
+    assertRowsEquals(rows,
+        "[+I[id1, Alice, 23, 1970-01-01T00:00:01, par1], "
+            + "+I[id2, Bob, 31, 1970-01-01T00:00:02, par2]]");
 
-    // Sink (write) path should throw
-    ValidationException writeEx = assertThrows(ValidationException.class,
-        () -> execInsertSql(batchTableEnv, "insert into lance_t1 values 
('id1', 'Alice', 23, TIMESTAMP '1970-01-01 00:00:01', 'par1')"),
-        "Lance format should be rejected when writing via Flink");
-    assertTrue(ExceptionUtils.findThrowableWithMessage(writeEx, 
HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG).isPresent());
+    List<Row> projectedRows = CollectionUtil.iteratorToList(
+        batchTableEnv.executeSql("select name, uuid from lance_t1").collect());
+    assertRowsEquals(projectedRows, "[+I[Alice, id1], +I[Bob, id2]]");
   }
 
   @ParameterizedTest
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
index 7a8ce8dcd80a..84e1136bc296 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
@@ -22,6 +22,7 @@ package org.apache.hudi.table;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.schema.HoodieSchema;
@@ -57,6 +58,7 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -85,6 +87,12 @@ public class TestHoodieFileGroupReaderOnFlink extends 
TestHoodieFileGroupReaderB
   private Configuration conf;
   private Option<InstantRange> instantRangeOpt = Option.empty();
 
+  @BeforeAll
+  public static void setUpClass() {
+    // add the lance format when composition type is supported
+    supportedFileFormats = Collections.singletonList(HoodieFileFormat.PARQUET);
+  }
+
   @BeforeEach
   public void setup() {
     conf = new Configuration();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index f594e9575860..6aa6f20581c3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -18,10 +18,10 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.EventTimeAvroPayload;
-import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieValidationException;
@@ -788,28 +788,60 @@ public class TestHoodieTableFactory {
   }
 
   @Test
-  void testLanceFormatNotSupportedByFlink() {
-    // Lance base file format is only supported with the Spark engine.
-    // Both source and sink should reject it with a clear error message.
-    this.conf.setString("hoodie.table.base.file.format", "LANCE");
-    ResolvedSchema schema = SchemaBuilder.instance()
+  void testLanceFormatSupportedForAppendOnlyTables() {
+    Configuration lanceConf = new Configuration();
+    lanceConf.set(FlinkOptions.PATH, new File(tempFile, 
"lance").getAbsolutePath());
+    lanceConf.set(FlinkOptions.TABLE_NAME, "lance_t1");
+    lanceConf.set(FlinkOptions.OPERATION, "insert");
+    lanceConf.setString("hoodie.table.base.file.format", "LANCE");
+    ResolvedSchema appendOnlySchema = SchemaBuilder.instance()
         .field("f0", DataTypes.INT().notNull())
         .field("f1", DataTypes.VARCHAR(20))
         .field("f2", DataTypes.TIMESTAMP(3))
         .field("ts", DataTypes.TIMESTAMP(3))
+        .build();
+    final MockContext appendOnlyContext = MockContext.getInstance(lanceConf, 
appendOnlySchema, "f2");
+
+    assertDoesNotThrow(() -> new 
HoodieTableFactory().createDynamicTableSink(appendOnlyContext));
+
+    Configuration morConf = new Configuration(lanceConf);
+    morConf.set(FlinkOptions.TABLE_TYPE, 
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+    final MockContext morContext = MockContext.getInstance(morConf, 
appendOnlySchema, "f2");
+    HoodieValidationException morEx = 
assertThrows(HoodieValidationException.class,
+        () -> new HoodieTableFactory().createDynamicTableSink(morContext));
+    assertThat(morEx.getMessage(), is("Flink Lance base-file support is only 
available for COPY_ON_WRITE append-only tables."));
+
+    Configuration upsertConf = new Configuration(lanceConf);
+    upsertConf.set(FlinkOptions.OPERATION, "upsert");
+    final MockContext upsertContext = MockContext.getInstance(upsertConf, 
appendOnlySchema, "f2");
+    HoodieValidationException operationEx = 
assertThrows(HoodieValidationException.class,
+        () -> new HoodieTableFactory().createDynamicTableSink(upsertContext));
+    assertThat(operationEx.getMessage(), is("Flink Lance base-file writes 
require append-only INSERT mode. Set '"
+        + FlinkOptions.OPERATION.key() + "' = 'insert'."));
+
+    Configuration schemaEvolutionConf = new Configuration(lanceConf);
+    
schemaEvolutionConf.setString(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), 
"true");
+    final MockContext schemaEvolutionContext = 
MockContext.getInstance(schemaEvolutionConf, appendOnlySchema, "f2");
+    HoodieValidationException schemaEvolutionEx = 
assertThrows(HoodieValidationException.class,
+        () -> new 
HoodieTableFactory().createDynamicTableSink(schemaEvolutionContext));
+    assertThat(schemaEvolutionEx.getMessage(), is("Flink Lance base-file 
support does not support schema evolution. Set '"
+        + HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key() + "' = 'false'."));
+
+    ResolvedSchema primaryKeySchema = SchemaBuilder.instance()
+        .field("f0", DataTypes.INT().notNull())
+        .field("f1", DataTypes.VARCHAR(20))
         .primaryKey("f0")
         .build();
-    final MockContext context = MockContext.getInstance(this.conf, schema, 
"f2");
-
-    // Source path should throw
-    HoodieValidationException sourceEx = 
assertThrows(HoodieValidationException.class,
-        () -> new HoodieTableFactory().createDynamicTableSource(context));
-    assertThat(sourceEx.getMessage(), 
is(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG));
+    final MockContext primaryKeyContext = MockContext.getInstance(lanceConf, 
primaryKeySchema, "f1");
+    HoodieValidationException primaryKeyEx = 
assertThrows(HoodieValidationException.class,
+        () -> new 
HoodieTableFactory().createDynamicTableSink(primaryKeyContext));
+    assertThat(primaryKeyEx.getMessage(), is("Flink Lance base-file support is 
only available for append-only tables without primary keys."));
 
-    // Sink path should throw
+    lanceConf.set(FlinkOptions.RECORD_KEY_FIELD, "f0");
+    final MockContext keyedContext = MockContext.getInstance(lanceConf, 
appendOnlySchema, "f2");
     HoodieValidationException sinkEx = 
assertThrows(HoodieValidationException.class,
-        () -> new HoodieTableFactory().createDynamicTableSink(context));
-    assertThat(sinkEx.getMessage(), 
is(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG));
+        () -> new HoodieTableFactory().createDynamicTableSink(keyedContext));
+    assertThat(sinkEx.getMessage(), is("Flink Lance base-file support is only 
available for append-only tables without primary keys."));
   }
 
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
index 9934d8a4e160..379a769ec815 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.catalog;
 
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig;
 import org.apache.hudi.common.schema.HoodieSchema;
@@ -380,6 +381,35 @@ public class TestHoodieCatalog extends 
BaseTestHoodieCatalog {
     assertEquals("Primary key definition is missing", exception.getMessage());
   }
 
+  @Test
+  public void testCreateAppendOnlyLanceTableWithoutPrimaryKey() throws 
Exception {
+    ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, 
"tb_lance_append_only");
+    Map<String, String> lanceOptions = new HashMap<>(EXPECTED_OPTIONS);
+    lanceOptions.put(FlinkOptions.TABLE_TYPE.key(), 
FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
+    lanceOptions.put(FlinkOptions.OPERATION.key(), "insert");
+    lanceOptions.put(FlinkOptions.PRE_COMBINE.key(), "false");
+    lanceOptions.put(HoodieTableConfig.BASE_FILE_FORMAT.key(), 
HoodieFileFormat.LANCE.name());
+    ResolvedSchema appendOnlySchema = new ResolvedSchema(CREATE_COLUMNS, 
Collections.emptyList(), null);
+    ResolvedCatalogTable lanceTable = new ResolvedCatalogTable(
+        CatalogUtils.createCatalogTable(
+            Schema.newBuilder().fromResolvedSchema(appendOnlySchema).build(),
+            Arrays.asList("partition"),
+            lanceOptions,
+            "test_lance_append_only"),
+        appendOnlySchema
+    );
+
+    catalog.createTable(tablePath, lanceTable, false);
+
+    assertTrue(catalog.tableExists(tablePath));
+    CatalogBaseTable actualTable = catalog.getTable(tablePath);
+    
assertFalse(actualTable.getOptions().containsKey(TableOptionProperties.PK_COLUMNS));
+    HoodieTableMetaClient metaClient = createMetaClient(
+        new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(new 
Configuration())),
+        catalog.inferTablePath(catalogPathStr, tablePath));
+    assertThat(metaClient.getTableConfig().getBaseFileFormat(), 
is(HoodieFileFormat.LANCE));
+  }
+
   @Test
   void testCreateTableWithPartitionBucketIndex() throws 
TableAlreadyExistException, DatabaseNotExistException, IOException {
     String rule = "regex";
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java
index 189e297024fb..751f63d6de75 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java
@@ -28,7 +28,6 @@ import org.apache.hudi.common.table.read.BufferedRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.source.ExpressionPredicates;
 import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.storage.StoragePath;
 
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
@@ -44,7 +43,6 @@ import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -134,14 +132,6 @@ class TestFlinkRowDataReaderContext {
     assertTrue(result.getBoolean(2));
   }
 
-  @Test
-  void testLanceFormatThrowsInGetFileRecordIterator() {
-    StoragePath lancePath = new 
StoragePath("/tmp/test-table/partition/file.lance");
-    UnsupportedOperationException ex = 
assertThrows(UnsupportedOperationException.class,
-        () -> readerContext.getFileRecordIterator(lancePath, 0, 100, SCHEMA, 
SCHEMA, null));
-    assertEquals(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG, ex.getMessage());
-  }
-
   private GenericRowData createBaseRow(int id, String name, boolean active) {
     return GenericRowData.of(id, StringData.fromString(name), active);
   }

Reply via email to