This is an automated email from the ASF dual-hosted git repository.
vhs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 65f996abe491 feat(lance): throwing exception/guard for users trying to
read Lance from non-spark engines (#18481)
65f996abe491 is described below
commit 65f996abe4912ad872234cc995e9f38966dd63bb
Author: Vova Kolmakov <[email protected]>
AuthorDate: Wed Apr 8 20:53:43 2026 +0700
feat(lance): throwing exception/guard for users trying to read Lance from
non-spark engines (#18481)
* feat(lance): throwing exception/guard for users trying to read Lance from
non-spark engnies
* fixed by review notes
---------
Co-authored-by: Vova Kolmakov <[email protected]>
---
.../apache/hudi/common/model/HoodieFileFormat.java | 4 +++
.../hudi/io/storage/HoodieFileReaderFactory.java | 2 +-
.../hudi/io/storage/HoodieFileWriterFactory.java | 2 +-
.../org/apache/hudi/table/HoodieTableFactory.java | 18 ++++++++++++++
.../hudi/table/catalog/HoodieHiveCatalog.java | 4 +++
.../table/format/FlinkRowDataReaderContext.java | 3 +++
.../apache/hudi/table/ITTestHoodieDataSource.java | 29 ++++++++++++++++++++++
.../apache/hudi/table/TestHoodieTableFactory.java | 26 +++++++++++++++++++
.../hudi/table/catalog/TestHoodieHiveCatalog.java | 13 ++++++++++
.../format/TestFlinkRowDataReaderContext.java | 10 ++++++++
.../hudi/hadoop/HiveHoodieReaderContext.java | 3 +++
11 files changed, 112 insertions(+), 2 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
index a4913ae55c5d..dfbb7b8d858e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
@@ -55,6 +55,10 @@ public enum HoodieFileFormat {
+ "and designed for ML and AI workloads")
LANCE(".lance");
+ public static final String LANCE_SPARK_ONLY_ERROR_MSG =
+ "Lance base file format is currently only supported with the Spark
engine. "
+ + "Please use Parquet, ORC, or HFile for non-Spark engines (Flink,
Hive, Presto, Trino).";
+
public static final Set<String> BASE_FILE_EXTENSIONS =
Arrays.stream(HoodieFileFormat.values())
.map(HoodieFileFormat::getFileExtension)
.filter(x -> !x.equals(HoodieFileFormat.HOODIE_LOG.getFileExtension()))
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
index 4f0d1d2adf16..21420c1b4de1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
@@ -134,7 +134,7 @@ public class HoodieFileReaderFactory {
}
protected HoodieFileReader newLanceFileReader(StoragePath path) {
- throw new UnsupportedOperationException();
+ throw new
UnsupportedOperationException(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG);
}
public HoodieFileReader newBootstrapFileReader(HoodieFileReader
skeletonFileReader,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
index 66e647fadddf..895bcb891e19 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
@@ -114,7 +114,7 @@ public class HoodieFileWriterFactory {
protected HoodieFileWriter newLanceFileWriter(
String instantTime, StoragePath path, HoodieConfig config, HoodieSchema
schema,
TaskContextSupplier taskContextSupplier) throws IOException {
- throw new UnsupportedOperationException();
+ throw new
UnsupportedOperationException(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG);
}
public static BloomFilter createBloomFilter(HoodieConfig config) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index fa703b2eef1a..a08f3f56aacb 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -19,6 +19,7 @@
package org.apache.hudi.table;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.HoodieTableConfig;
@@ -85,6 +86,7 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
StoragePath path = new
StoragePath(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
new ValidationException("Option [path] should not be empty.")));
setupTableOptions(conf.get(FlinkOptions.PATH), conf);
+ checkBaseFileFormat(conf);
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
setupConfOptions(conf, context.getObjectIdentifier(),
context.getCatalogTable(), schema);
return new HoodieTableSource(
@@ -114,6 +116,11 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
private void setupTableOptions(String basePath, Configuration conf) {
StreamerUtil.getTableConfig(basePath,
HadoopConfigurations.getHadoopConf(conf))
.ifPresent(tableConfig -> {
+ // Guard: reject Lance from existing table config
(hoodie.properties); checkBaseFileFormat() handles user-supplied config
separately
+ if (tableConfig.contains(HoodieTableConfig.BASE_FILE_FORMAT)
+ &&
HoodieFileFormat.LANCE.name().equalsIgnoreCase(tableConfig.getString(HoodieTableConfig.BASE_FILE_FORMAT)))
{
+ throw new
HoodieValidationException(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG);
+ }
if (tableConfig.contains(HoodieTableConfig.RECORDKEY_FIELDS)
&& !conf.contains(FlinkOptions.RECORD_KEY_FIELD)) {
conf.set(FlinkOptions.RECORD_KEY_FIELD,
tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS));
@@ -170,6 +177,7 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
* @param schema The table schema
*/
private void sanityCheck(Configuration conf, ResolvedSchema schema) {
+ checkBaseFileFormat(conf);
checkTableType(conf);
checkIndexType(conf);
@@ -201,6 +209,16 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
}
}
+ /**
+ * Validate the base file format. Lance is only supported with the Spark
engine.
+ */
+ private void checkBaseFileFormat(Configuration conf) {
+ String baseFileFormat =
conf.getString(HoodieTableConfig.BASE_FILE_FORMAT.key(), null);
+ if (baseFileFormat != null &&
HoodieFileFormat.LANCE.name().equalsIgnoreCase(baseFileFormat)) {
+ throw new
HoodieValidationException(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG);
+ }
+ }
+
/**
* Validate the table type.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index 0ac03678711f..3caeb9565afb 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -637,6 +637,10 @@ public class HoodieHiveCatalog extends AbstractCatalog {
}
HoodieFileFormat baseFileFormat = HoodieFileFormat.PARQUET;
+ String baseFileFormatStr =
properties.get(HoodieTableConfig.BASE_FILE_FORMAT.key());
+ if (baseFileFormatStr != null &&
HoodieFileFormat.LANCE.name().equalsIgnoreCase(baseFileFormatStr)) {
+ throw new
HoodieValidationException(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG);
+ }
//ignore uber input Format
String inputFormatClassName =
HoodieInputFormatUtils.getInputFormatClassName(baseFileFormat,
useRealTimeInputFormat);
String outputFormatClassName =
HoodieInputFormatUtils.getOutputFormatClassName(baseFileFormat);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index 991f099d8981..2015244e24ce 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -98,6 +98,9 @@ public class FlinkRowDataReaderContext extends
HoodieReaderContext<RowData> {
HoodieSchema dataSchema,
HoodieSchema requiredSchema,
HoodieStorage storage) throws IOException {
+ if
(filePath.toString().endsWith(HoodieFileFormat.LANCE.getFileExtension())) {
+ throw new
UnsupportedOperationException(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG);
+ }
boolean isLogFile = FSUtils.isLogFile(filePath);
// disable schema evolution in fileReader if it's log file, since schema
evolution for log file is handled in `FileGroupRecordBuffer`
InternalSchemaManager schemaManager = isLogFile ?
InternalSchemaManager.DISABLED : internalSchemaManager.get();
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 84688add5700..60aa113eea50 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.bucket.partition.PartitionBucketIndexUtils;
@@ -1360,6 +1361,34 @@ public class ITTestHoodieDataSource {
assertRowsEquals(rows2, "[]");
}
+ @Test
+ void testLanceFormatRejectedByFlink() {
+ // Lance base file format is only supported with the Spark engine.
+ // Flink should reject it early with a clear error on both read and write
paths.
+ String createLanceTable = sql("lance_t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
+ .option("hoodie.table.base.file.format", "LANCE")
+ .end();
+
+ // Creating the table itself succeeds (DDL is just metadata registration),
+ // but any attempt to read or write should fail.
+ // Flink wraps our HoodieValidationException in its own
ValidationException.
+ batchTableEnv.executeSql(createLanceTable);
+
+ // Source (read) path should throw
+ ValidationException readEx = assertThrows(ValidationException.class,
+ () -> execSelectSql(batchTableEnv, "select * from lance_t1"),
+ "Lance format should be rejected when reading via Flink");
+ assertTrue(ExceptionUtils.findThrowableWithMessage(readEx,
HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG).isPresent());
+
+ // Sink (write) path should throw
+ ValidationException writeEx = assertThrows(ValidationException.class,
+ () -> execInsertSql(batchTableEnv, "insert into lance_t1 values
('id1', 'Alice', 23, TIMESTAMP '1970-01-01 00:00:01', 'par1')"),
+ "Lance format should be rejected when writing via Flink");
+ assertTrue(ExceptionUtils.findThrowableWithMessage(writeEx,
HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG).isPresent());
+ }
+
@ParameterizedTest
@EnumSource(value = ExecMode.class)
void testWriteAndReadDebeziumJson(ExecMode execMode) throws Exception {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index 50bebdb3737c..4d29503208dc 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.EventTimeAvroPayload;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieValidationException;
@@ -746,6 +747,31 @@ public class TestHoodieTableFactory {
assertThat(conf2.get(FlinkOptions.PRE_COMBINE), is(false));
}
+ @Test
+ void testLanceFormatNotSupportedByFlink() {
+ // Lance base file format is only supported with the Spark engine.
+ // Both source and sink should reject it with a clear error message.
+ this.conf.setString("hoodie.table.base.file.format", "LANCE");
+ ResolvedSchema schema = SchemaBuilder.instance()
+ .field("f0", DataTypes.INT().notNull())
+ .field("f1", DataTypes.VARCHAR(20))
+ .field("f2", DataTypes.TIMESTAMP(3))
+ .field("ts", DataTypes.TIMESTAMP(3))
+ .primaryKey("f0")
+ .build();
+ final MockContext context = MockContext.getInstance(this.conf, schema,
"f2");
+
+ // Source path should throw
+ HoodieValidationException sourceEx =
assertThrows(HoodieValidationException.class,
+ () -> new HoodieTableFactory().createDynamicTableSource(context));
+ assertThat(sourceEx.getMessage(),
is(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG));
+
+ // Sink path should throw
+ HoodieValidationException sinkEx =
assertThrows(HoodieValidationException.class,
+ () -> new HoodieTableFactory().createDynamicTableSink(context));
+ assertThat(sinkEx.getMessage(),
is(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG));
+ }
+
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
index d1ac72550644..7f67a0281e5c 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
@@ -391,6 +391,19 @@ public class TestHoodieHiveCatalog extends
BaseTestHoodieCatalog {
assertThrows(HoodieCatalogException.class, () ->
hoodieCatalog.createTable(tablePath, table, false));
}
+ @Test
+ public void testLanceFormatNotSupportedByHiveCatalog() {
+ HashMap<String, String> properties = new HashMap<>();
+ properties.put(FactoryUtil.CONNECTOR.key(), "hudi");
+ properties.put(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE");
+ CatalogTable table = CatalogUtils.createCatalogTable(schema,
Collections.emptyList(), properties, "lance table");
+ // createTable wraps the HoodieValidationException in a
HoodieCatalogException
+ HoodieCatalogException ex = assertThrows(HoodieCatalogException.class,
+ () -> hoodieCatalog.createTable(tablePath, table, false));
+ assertThat(ex.getCause(), instanceOf(HoodieValidationException.class));
+ assertThat(ex.getCause().getMessage(), containsString("Lance base file
format is currently only supported with the Spark engine"));
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDropTable(boolean external) throws
TableAlreadyExistException, DatabaseNotExistException, TableNotExistException,
IOException {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java
index 751f63d6de75..189e297024fb 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.source.ExpressionPredicates;
import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
@@ -43,6 +44,7 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -132,6 +134,14 @@ class TestFlinkRowDataReaderContext {
assertTrue(result.getBoolean(2));
}
+ @Test
+ void testLanceFormatThrowsInGetFileRecordIterator() {
+ StoragePath lancePath = new
StoragePath("/tmp/test-table/partition/file.lance");
+ UnsupportedOperationException ex =
assertThrows(UnsupportedOperationException.class,
+ () -> readerContext.getFileRecordIterator(lancePath, 0, 100, SCHEMA,
SCHEMA, null));
+ assertEquals(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG, ex.getMessage());
+ }
+
private GenericRowData createBaseRow(int id, String name, boolean active) {
return GenericRowData.of(id, StringData.fromString(name), active);
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index a86818547c3e..9e7994f0147b 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -127,6 +127,9 @@ public class HiveHoodieReaderContext extends
HoodieReaderContext<ArrayWritable>
private ClosableIterator<ArrayWritable> getFileRecordIterator(StoragePath
filePath, String[] hosts, long start, long length, HoodieSchema dataSchema,
HoodieSchema
requiredSchema, HoodieStorage storage) throws IOException {
+ if
(filePath.toString().endsWith(HoodieFileFormat.LANCE.getFileExtension())) {
+ throw new
UnsupportedOperationException(HoodieFileFormat.LANCE_SPARK_ONLY_ERROR_MSG);
+ }
// mdt file schema irregular and does not work with this logic. Also, log
file evolution is handled inside the log block
boolean isParquetOrOrc =
filePath.getFileExtension().equals(HoodieFileFormat.PARQUET.getFileExtension())
||
filePath.getFileExtension().equals(HoodieFileFormat.ORC.getFileExtension());