This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 314ffaf06e59 feat: add lance format support for Flink COW table
(#18862)
314ffaf06e59 is described below
commit 314ffaf06e5916d0f548d130ca230c3baf2cf2ba
Author: Danny Chan <[email protected]>
AuthorDate: Mon Jun 1 17:02:53 2026 +0800
feat: add lance format support for Flink COW table (#18862)
---
.../io/storage/row/HoodieRowDataLanceWriter.java | 2 +-
.../org/apache/hudi/table/HoodieTableFactory.java | 25 ++++++++--------------
.../table/format/HoodieRowDataLanceReader.java | 19 +++++++++++++++-
.../apache/hudi/table/ITTestHoodieDataSource.java | 21 ++++++++++++++++++
.../apache/hudi/table/TestHoodieTableFactory.java | 25 ++++++++--------------
5 files changed, 58 insertions(+), 34 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataLanceWriter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataLanceWriter.java
index f4388c23fc43..6381dba3005f 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataLanceWriter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataLanceWriter.java
@@ -37,7 +37,7 @@ import java.io.IOException;
import java.util.function.Function;
/**
- * Lance writer for Flink {@link RowData} append-only base files.
+ * Lance writer for Flink {@link RowData} base files.
*/
public class HoodieRowDataLanceWriter extends HoodieBaseLanceWriter<RowData,
String>
implements HoodieRowDataFileWriter {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index ae8dbb1e963b..6241551e79a2 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -89,7 +89,7 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
setupTableOptions(conf.get(FlinkOptions.PATH), conf);
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
setupConfOptions(conf, context.getObjectIdentifier(),
context.getCatalogTable(), schema);
- checkBaseFileFormatForRead(conf, schema);
+ checkBaseFileFormatForRead(conf);
return new HoodieTableSource(
SerializableSchema.create(schema),
path,
@@ -174,7 +174,7 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
*/
private void sanityCheck(Configuration conf, ResolvedSchema schema) {
checkTableType(conf);
- checkBaseFileFormatForWrite(conf, schema);
+ checkBaseFileFormatForWrite(conf);
checkIndexType(conf);
if (!OptionsResolver.isAppendMode(conf)) {
@@ -216,29 +216,22 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
}
/**
- * Validate the base file format. Flink Lance support is scoped to
append-only COW tables.
+ * Validate the base file format. Flink Lance support is scoped to COW
tables.
*/
- private void checkBaseFileFormatForRead(Configuration conf, ResolvedSchema
schema) {
- checkLanceBaseFileFormat(conf, schema);
+ private void checkBaseFileFormatForRead(Configuration conf) {
+ checkLanceBaseFileFormat(conf);
}
- private void checkBaseFileFormatForWrite(Configuration conf, ResolvedSchema
schema) {
- checkLanceBaseFileFormat(conf, schema);
- if (isLanceBaseFileFormat(conf) && !OptionsResolver.isAppendMode(conf)) {
- throw new HoodieValidationException("Flink Lance base-file writes
require append-only INSERT mode. Set '"
- + FlinkOptions.OPERATION.key() + "' = 'insert'.");
- }
+ private void checkBaseFileFormatForWrite(Configuration conf) {
+ checkLanceBaseFileFormat(conf);
}
- private void checkLanceBaseFileFormat(Configuration conf, ResolvedSchema
schema) {
+ private void checkLanceBaseFileFormat(Configuration conf) {
if (!isLanceBaseFileFormat(conf)) {
return;
}
- if (conf.containsKey(FlinkOptions.RECORD_KEY_FIELD.key()) ||
schema.getPrimaryKey().isPresent()) {
- throw new HoodieValidationException("Flink Lance base-file support is
only available for append-only tables without primary keys.");
- }
if (OptionsResolver.isMorTable(conf)) {
- throw new HoodieValidationException("Flink Lance base-file support is
only available for COPY_ON_WRITE append-only tables.");
+ throw new HoodieValidationException("Flink Lance base-file support is
only available for COPY_ON_WRITE tables.");
}
if (OptionsResolver.isSchemaEvolutionEnabled(conf)) {
throw new HoodieValidationException("Flink Lance base-file support does
not support schema evolution. Set '"
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataLanceReader.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataLanceReader.java
index 8c7564af6730..eba397093ec5 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataLanceReader.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataLanceReader.java
@@ -52,6 +52,7 @@ import org.lance.file.LanceFileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -119,7 +120,23 @@ public class HoodieRowDataLanceReader implements
HoodieFileReader<RowData> {
@Override
public Set<Pair<String, Long>> filterRowKeys(Set<String> candidateRowKeys) {
- throw new HoodieException("Filtering row keys from Lance files is not
supported for Flink append-only tables without primary keys: " + path);
+ Set<Pair<String, Long>> result = new HashSet<>();
+ long position = 0;
+ boolean includeAllKeys = candidateRowKeys == null ||
candidateRowKeys.isEmpty();
+
+ try (ClosableIterator<String> keyIterator = getRecordKeyIterator()) {
+ while (keyIterator.hasNext()) {
+ String recordKey = keyIterator.next();
+ if (includeAllKeys || candidateRowKeys.contains(recordKey)) {
+ result.add(Pair.of(recordKey, position));
+ }
+ position++;
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to filter row keys from Lance file:
" + path, e);
+ }
+
+ return result;
}
@Override
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 1cc5beaad418..168c1b893beb 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1381,6 +1381,27 @@ public class ITTestHoodieDataSource {
assertRowsEquals(projectedRows, "[+I[Alice, id1], +I[Bob, id2]]");
}
+ @Test
+ void testLanceFormatCopyOnWriteUpsertWriteAndRead() {
+ String createHoodieTable = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
+ .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_COPY_ON_WRITE)
+ .option("hoodie.table.base.file.format", "LANCE")
+ .end();
+ batchTableEnv.executeSql(createHoodieTable);
+
+ execInsertSql(batchTableEnv, TestSQL.INSERT_T1);
+ List<Row> rows = CollectionUtil.iteratorToList(
+ batchTableEnv.executeSql("select * from t1").collect());
+ assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
+
+ execInsertSql(batchTableEnv, TestSQL.UPDATE_INSERT_T1);
+ List<Row> updatedRows = CollectionUtil.iteratorToList(
+ batchTableEnv.executeSql("select * from t1").collect());
+ assertRowsEquals(updatedRows, TestData.DATA_SET_SOURCE_MERGED);
+ }
+
@ParameterizedTest
@EnumSource(value = ExecMode.class)
void testWriteAndReadDebeziumJson(ExecMode execMode) throws Exception {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index 6aa6f20581c3..73a032bf4313 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -788,7 +788,7 @@ public class TestHoodieTableFactory {
}
@Test
- void testLanceFormatSupportedForAppendOnlyTables() {
+ void testLanceFormatSupportedForCopyOnWriteTables() {
Configuration lanceConf = new Configuration();
lanceConf.set(FlinkOptions.PATH, new File(tempFile,
"lance").getAbsolutePath());
lanceConf.set(FlinkOptions.TABLE_NAME, "lance_t1");
@@ -809,15 +809,7 @@ public class TestHoodieTableFactory {
final MockContext morContext = MockContext.getInstance(morConf,
appendOnlySchema, "f2");
HoodieValidationException morEx =
assertThrows(HoodieValidationException.class,
() -> new HoodieTableFactory().createDynamicTableSink(morContext));
- assertThat(morEx.getMessage(), is("Flink Lance base-file support is only
available for COPY_ON_WRITE append-only tables."));
-
- Configuration upsertConf = new Configuration(lanceConf);
- upsertConf.set(FlinkOptions.OPERATION, "upsert");
- final MockContext upsertContext = MockContext.getInstance(upsertConf,
appendOnlySchema, "f2");
- HoodieValidationException operationEx =
assertThrows(HoodieValidationException.class,
- () -> new HoodieTableFactory().createDynamicTableSink(upsertContext));
- assertThat(operationEx.getMessage(), is("Flink Lance base-file writes
require append-only INSERT mode. Set '"
- + FlinkOptions.OPERATION.key() + "' = 'insert'."));
+ assertThat(morEx.getMessage(), is("Flink Lance base-file support is only
available for COPY_ON_WRITE tables."));
Configuration schemaEvolutionConf = new Configuration(lanceConf);
schemaEvolutionConf.setString(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(),
"true");
@@ -833,15 +825,16 @@ public class TestHoodieTableFactory {
.primaryKey("f0")
.build();
final MockContext primaryKeyContext = MockContext.getInstance(lanceConf,
primaryKeySchema, "f1");
- HoodieValidationException primaryKeyEx =
assertThrows(HoodieValidationException.class,
- () -> new
HoodieTableFactory().createDynamicTableSink(primaryKeyContext));
- assertThat(primaryKeyEx.getMessage(), is("Flink Lance base-file support is
only available for append-only tables without primary keys."));
+ assertDoesNotThrow(() -> new
HoodieTableFactory().createDynamicTableSink(primaryKeyContext));
+
+ Configuration upsertConf = new Configuration(lanceConf);
+ upsertConf.set(FlinkOptions.OPERATION, "upsert");
+ final MockContext upsertContext = MockContext.getInstance(upsertConf,
primaryKeySchema, "f1");
+ assertDoesNotThrow(() -> new
HoodieTableFactory().createDynamicTableSink(upsertContext));
lanceConf.set(FlinkOptions.RECORD_KEY_FIELD, "f0");
final MockContext keyedContext = MockContext.getInstance(lanceConf,
appendOnlySchema, "f2");
- HoodieValidationException sinkEx =
assertThrows(HoodieValidationException.class,
- () -> new HoodieTableFactory().createDynamicTableSink(keyedContext));
- assertThat(sinkEx.getMessage(), is("Flink Lance base-file support is only
available for append-only tables without primary keys."));
+ assertDoesNotThrow(() -> new
HoodieTableFactory().createDynamicTableSink(keyedContext));
}
// -------------------------------------------------------------------------