This is an automated email from the ASF dual-hosted git repository.

vhs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 65f996abe491 feat(lance): throwing exception/guard for users trying to 
read Lance from non-spark engines (#18481)
65f996abe491 is described below

commit 65f996abe4912ad872234cc995e9f38966dd63bb
Author: Vova Kolmakov <[email protected]>
AuthorDate: Wed Apr 8 20:53:43 2026 +0700

    feat(lance): throwing exception/guard for users trying to read Lance from 
non-spark engines (#18481)
    
    * feat(lance): throwing exception/guard for users trying to read Lance from 
non-spark engnies
    
    * fixed by review notes
    
    ---------
    
    Co-authored-by: Vova Kolmakov <[email protected]>
---
 .../apache/hudi/common/model/HoodieFileFormat.java |  4 +++
 .../hudi/io/storage/HoodieFileReaderFactory.java   |  2 +-
 .../hudi/io/storage/HoodieFileWriterFactory.java   |  2 +-
 .../org/apache/hudi/table/HoodieTableFactory.java  | 18 ++++++++++++++
 .../hudi/table/catalog/HoodieHiveCatalog.java      |  4 +++
 .../table/format/FlinkRowDataReaderContext.java    |  3 +++
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 29 ++++++++++++++++++++++
 .../apache/hudi/table/TestHoodieTableFactory.java  | 26 +++++++++++++++++++
 .../hudi/table/catalog/TestHoodieHiveCatalog.java  | 13 ++++++++++
 .../format/TestFlinkRowDataReaderContext.java      | 10 ++++++++
 .../hudi/hadoop/HiveHoodieReaderContext.java       |  3 +++
 11 files changed, 112 insertions(+), 2 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
index a4913ae55c5d..dfbb7b8d858e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
@@ -55,6 +55,10 @@ public enum HoodieFileFormat {
           + "and designed for ML and AI workloads")
   LANCE(".lance");
 
+  public static final String LANCE_SPARK_ONLY_ERROR_MSG =
+      "Lance base file format is currently only supported with the Spark 
engine. "
+          + "Please use Parquet, ORC, or HFile for non-Spark engines (Flink, 
Hive, Presto, Trino).";
+
   public static final Set<String> BASE_FILE_EXTENSIONS = 
Arrays.stream(HoodieFileFormat.values())
       .map(HoodieFileFormat::getFileExtension)
       .filter(x -> !x.equals(HoodieFileFormat.HOODIE_LOG.getFileExtension()))
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
index 4f0d1d2adf16..21420c1b4de1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
@@ -134,7 +134,7 @@ public class HoodieFileReaderFactory {
   }
 
   protected HoodieFileReader newLanceFileReader(StoragePath path) {
-    throw new UnsupportedOperationException();
+    throw new 
UnsupportedOperationException(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG);
   }
 
   public HoodieFileReader newBootstrapFileReader(HoodieFileReader 
skeletonFileReader,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
index 66e647fadddf..895bcb891e19 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
@@ -114,7 +114,7 @@ public class HoodieFileWriterFactory {
   protected HoodieFileWriter newLanceFileWriter(
       String instantTime, StoragePath path, HoodieConfig config, HoodieSchema 
schema,
       TaskContextSupplier taskContextSupplier) throws IOException {
-    throw new UnsupportedOperationException();
+    throw new 
UnsupportedOperationException(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG);
   }
 
   public static BloomFilter createBloomFilter(HoodieConfig config) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index fa703b2eef1a..a08f3f56aacb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.table;
 
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.table.HoodieTableConfig;
@@ -85,6 +86,7 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
     StoragePath path = new 
StoragePath(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
         new ValidationException("Option [path] should not be empty.")));
     setupTableOptions(conf.get(FlinkOptions.PATH), conf);
+    checkBaseFileFormat(conf);
     ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
     setupConfOptions(conf, context.getObjectIdentifier(), 
context.getCatalogTable(), schema);
     return new HoodieTableSource(
@@ -114,6 +116,11 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
   private void setupTableOptions(String basePath, Configuration conf) {
     StreamerUtil.getTableConfig(basePath, 
HadoopConfigurations.getHadoopConf(conf))
         .ifPresent(tableConfig -> {
+          // Guard: reject Lance from existing table config 
(hoodie.properties); checkBaseFileFormat() handles user-supplied config 
separately
+          if (tableConfig.contains(HoodieTableConfig.BASE_FILE_FORMAT)
+              && 
HoodieFileFormat.LANCE.name().equalsIgnoreCase(tableConfig.getString(HoodieTableConfig.BASE_FILE_FORMAT)))
 {
+            throw new 
HoodieValidationException(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG);
+          }
           if (tableConfig.contains(HoodieTableConfig.RECORDKEY_FIELDS)
               && !conf.contains(FlinkOptions.RECORD_KEY_FIELD)) {
             conf.set(FlinkOptions.RECORD_KEY_FIELD, 
tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS));
@@ -170,6 +177,7 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
    * @param schema The table schema
    */
   private void sanityCheck(Configuration conf, ResolvedSchema schema) {
+    checkBaseFileFormat(conf);
     checkTableType(conf);
     checkIndexType(conf);
 
@@ -201,6 +209,16 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
     }
   }
 
+  /**
+   * Validate the base file format. Lance is only supported with the Spark 
engine.
+   */
+  private void checkBaseFileFormat(Configuration conf) {
+    String baseFileFormat = 
conf.getString(HoodieTableConfig.BASE_FILE_FORMAT.key(), null);
+    if (baseFileFormat != null && 
HoodieFileFormat.LANCE.name().equalsIgnoreCase(baseFileFormat)) {
+      throw new 
HoodieValidationException(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG);
+    }
+  }
+
   /**
    * Validate the table type.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index 0ac03678711f..3caeb9565afb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -637,6 +637,10 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     }
 
     HoodieFileFormat baseFileFormat = HoodieFileFormat.PARQUET;
+    String baseFileFormatStr = 
properties.get(HoodieTableConfig.BASE_FILE_FORMAT.key());
+    if (baseFileFormatStr != null && 
HoodieFileFormat.LANCE.name().equalsIgnoreCase(baseFileFormatStr)) {
+      throw new 
HoodieValidationException(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG);
+    }
     //ignore uber input Format
     String inputFormatClassName = 
HoodieInputFormatUtils.getInputFormatClassName(baseFileFormat, 
useRealTimeInputFormat);
     String outputFormatClassName = 
HoodieInputFormatUtils.getOutputFormatClassName(baseFileFormat);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index 991f099d8981..2015244e24ce 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -98,6 +98,9 @@ public class FlinkRowDataReaderContext extends 
HoodieReaderContext<RowData> {
       HoodieSchema dataSchema,
       HoodieSchema requiredSchema,
       HoodieStorage storage) throws IOException {
+    if 
(filePath.toString().endsWith(HoodieFileFormat.LANCE.getFileExtension())) {
+      throw new 
UnsupportedOperationException(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG);
+    }
     boolean isLogFile = FSUtils.isLogFile(filePath);
     // disable schema evolution in fileReader if it's log file, since schema 
evolution for log file is handled in `FileGroupRecordBuffer`
     InternalSchemaManager schemaManager = isLogFile ? 
InternalSchemaManager.DISABLED : internalSchemaManager.get();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 84688add5700..60aa113eea50 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.bucket.partition.PartitionBucketIndexUtils;
@@ -1360,6 +1361,34 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(rows2, "[]");
   }
 
+  @Test
+  void testLanceFormatRejectedByFlink() {
+    // Lance base file format is only supported with the Spark engine.
+    // Flink should reject it early with a clear error on both read and write 
paths.
+    String createLanceTable = sql("lance_t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
+        .option("hoodie.table.base.file.format", "LANCE")
+        .end();
+
+    // Creating the table itself succeeds (DDL is just metadata registration),
+    // but any attempt to read or write should fail.
+    // Flink wraps our HoodieValidationException in its own 
ValidationException.
+    batchTableEnv.executeSql(createLanceTable);
+
+    // Source (read) path should throw
+    ValidationException readEx = assertThrows(ValidationException.class,
+        () -> execSelectSql(batchTableEnv, "select * from lance_t1"),
+        "Lance format should be rejected when reading via Flink");
+    assertTrue(ExceptionUtils.findThrowableWithMessage(readEx, 
HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG).isPresent());
+
+    // Sink (write) path should throw
+    ValidationException writeEx = assertThrows(ValidationException.class,
+        () -> execInsertSql(batchTableEnv, "insert into lance_t1 values 
('id1', 'Alice', 23, TIMESTAMP '1970-01-01 00:00:01', 'par1')"),
+        "Lance format should be rejected when writing via Flink");
+    assertTrue(ExceptionUtils.findThrowableWithMessage(writeEx, 
HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG).isPresent());
+  }
+
   @ParameterizedTest
   @EnumSource(value = ExecMode.class)
   void testWriteAndReadDebeziumJson(ExecMode execMode) throws Exception {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index 50bebdb3737c..4d29503208dc 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.EventTimeAvroPayload;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieValidationException;
@@ -746,6 +747,31 @@ public class TestHoodieTableFactory {
     assertThat(conf2.get(FlinkOptions.PRE_COMBINE), is(false));
   }
 
+  @Test
+  void testLanceFormatNotSupportedByFlink() {
+    // Lance base file format is only supported with the Spark engine.
+    // Both source and sink should reject it with a clear error message.
+    this.conf.setString("hoodie.table.base.file.format", "LANCE");
+    ResolvedSchema schema = SchemaBuilder.instance()
+        .field("f0", DataTypes.INT().notNull())
+        .field("f1", DataTypes.VARCHAR(20))
+        .field("f2", DataTypes.TIMESTAMP(3))
+        .field("ts", DataTypes.TIMESTAMP(3))
+        .primaryKey("f0")
+        .build();
+    final MockContext context = MockContext.getInstance(this.conf, schema, 
"f2");
+
+    // Source path should throw
+    HoodieValidationException sourceEx = 
assertThrows(HoodieValidationException.class,
+        () -> new HoodieTableFactory().createDynamicTableSource(context));
+    assertThat(sourceEx.getMessage(), 
is(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG));
+
+    // Sink path should throw
+    HoodieValidationException sinkEx = 
assertThrows(HoodieValidationException.class,
+        () -> new HoodieTableFactory().createDynamicTableSink(context));
+    assertThat(sinkEx.getMessage(), 
is(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG));
+  }
+
   // -------------------------------------------------------------------------
   //  Inner Class
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
index d1ac72550644..7f67a0281e5c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
@@ -391,6 +391,19 @@ public class TestHoodieHiveCatalog extends 
BaseTestHoodieCatalog {
     assertThrows(HoodieCatalogException.class, () -> 
hoodieCatalog.createTable(tablePath, table, false));
   }
 
+  @Test
+  public void testLanceFormatNotSupportedByHiveCatalog() {
+    HashMap<String, String> properties = new HashMap<>();
+    properties.put(FactoryUtil.CONNECTOR.key(), "hudi");
+    properties.put(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE");
+    CatalogTable table = CatalogUtils.createCatalogTable(schema, 
Collections.emptyList(), properties, "lance table");
+    // createTable wraps the HoodieValidationException in a 
HoodieCatalogException
+    HoodieCatalogException ex = assertThrows(HoodieCatalogException.class,
+        () -> hoodieCatalog.createTable(tablePath, table, false));
+    assertThat(ex.getCause(), instanceOf(HoodieValidationException.class));
+    assertThat(ex.getCause().getMessage(), containsString("Lance base file 
format is currently only supported with the Spark engine"));
+  }
+
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testDropTable(boolean external) throws 
TableAlreadyExistException, DatabaseNotExistException, TableNotExistException, 
IOException {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java
index 751f63d6de75..189e297024fb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.read.BufferedRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.source.ExpressionPredicates;
 import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
 
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
@@ -43,6 +44,7 @@ import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -132,6 +134,14 @@ class TestFlinkRowDataReaderContext {
     assertTrue(result.getBoolean(2));
   }
 
+  @Test
+  void testLanceFormatThrowsInGetFileRecordIterator() {
+    StoragePath lancePath = new 
StoragePath("/tmp/test-table/partition/file.lance");
+    UnsupportedOperationException ex = 
assertThrows(UnsupportedOperationException.class,
+        () -> readerContext.getFileRecordIterator(lancePath, 0, 100, SCHEMA, 
SCHEMA, null));
+    assertEquals(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG, ex.getMessage());
+  }
+
   private GenericRowData createBaseRow(int id, String name, boolean active) {
     return GenericRowData.of(id, StringData.fromString(name), active);
   }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index a86818547c3e..9e7994f0147b 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -127,6 +127,9 @@ public class HiveHoodieReaderContext extends 
HoodieReaderContext<ArrayWritable>
 
   private ClosableIterator<ArrayWritable> getFileRecordIterator(StoragePath 
filePath, String[] hosts, long start, long length, HoodieSchema dataSchema,
                                                                 HoodieSchema 
requiredSchema, HoodieStorage storage) throws IOException {
+    if 
(filePath.toString().endsWith(HoodieFileFormat.LANCE.getFileExtension())) {
+      throw new 
UnsupportedOperationException(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG);
+    }
     // mdt file schema irregular and does not work with this logic. Also, log 
file evolution is handled inside the log block
     boolean isParquetOrOrc = 
filePath.getFileExtension().equals(HoodieFileFormat.PARQUET.getFileExtension())
         || 
filePath.getFileExtension().equals(HoodieFileFormat.ORC.getFileExtension());

Reply via email to