This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 50eb95c7d6f5 fix(flink): fix disable table service not effective in
hudi-flink (#13875)
50eb95c7d6f5 is described below
commit 50eb95c7d6f5da986244e53627e54a703f6f8678
Author: fhan <[email protected]>
AuthorDate: Thu May 21 16:32:51 2026 +0800
fix(flink): fix disable table service not effective in hudi-flink (#13875)
make the HoodieWriteConfig.TABLE_SERVICES_ENABLED effective for Flink.
---------
Co-authored-by: fhan <[email protected]>
Co-authored-by: danny0405 <[email protected]>
---
.../apache/hudi/configuration/FlinkOptions.java | 6 ++
.../apache/hudi/configuration/OptionsResolver.java | 36 ++++++++++--
.../org/apache/hudi/sink/v2/utils/PipelinesV2.java | 8 ++-
.../apache/hudi/streamer/HoodieFlinkStreamer.java | 6 +-
.../org/apache/hudi/table/HoodieTableSink.java | 6 +-
.../hudi/configuration/TestOptionsResolver.java | 68 ++++++++++++++++++++++
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 29 +++++++++
.../hudi/sink/TestWriteMergeOnReadWithCompact.java | 31 ++++++++++
8 files changed, 178 insertions(+), 12 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 4910c721e09c..fbe29f93ef5b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -188,6 +188,12 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue(false) // keep sync with hoodie style
.withDescription("If enabled, the checkpoint Id will also be written to
hudi metadata.");
+ public static final ConfigOption<Boolean> TABLE_SERVICES_ENABLED =
ConfigOptions
+ .key(HoodieWriteConfig.TABLE_SERVICES_ENABLED.key())
+ .booleanType()
+ .defaultValue(HoodieWriteConfig.TABLE_SERVICES_ENABLED.defaultValue())
+ .withDescription("Master control to disable all table services including
archive, clean, compact, cluster, etc.");
+
// ------------------------------------------------------------------------
// Changelog Capture Options
// ------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 774a52231e8d..66e6e2815a5a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -296,7 +296,7 @@ public class OptionsResolver {
* @param conf The flink configuration.
*/
public static boolean needsAsyncCompaction(Configuration conf) {
- return OptionsResolver.isMorTable(conf) &&
conf.get(FlinkOptions.COMPACTION_ASYNC_ENABLED);
+ return OptionsResolver.isMorTable(conf) && areTableServicesEnabled(conf)
&& conf.get(FlinkOptions.COMPACTION_ASYNC_ENABLED);
}
/**
@@ -305,7 +305,7 @@ public class OptionsResolver {
* @param conf The flink configuration.
*/
public static boolean needsAsyncMetadataCompaction(Configuration conf) {
- return isStreamingIndexWriteEnabled(conf) &&
conf.get(FlinkOptions.METADATA_COMPACTION_ASYNC_ENABLED);
+ return isStreamingIndexWriteEnabled(conf) && areTableServicesEnabled(conf)
&& conf.get(FlinkOptions.METADATA_COMPACTION_ASYNC_ENABLED);
}
/**
@@ -314,7 +314,7 @@ public class OptionsResolver {
* @param conf The flink configuration.
*/
public static boolean needsScheduleMdtCompaction(Configuration conf) {
- return isStreamingIndexWriteEnabled(conf) &&
conf.get(FlinkOptions.METADATA_COMPACTION_SCHEDULE_ENABLED);
+ return isStreamingIndexWriteEnabled(conf) && areTableServicesEnabled(conf)
&& conf.get(FlinkOptions.METADATA_COMPACTION_SCHEDULE_ENABLED);
}
/**
@@ -324,7 +324,9 @@ public class OptionsResolver {
*/
public static boolean needsScheduleCompaction(Configuration conf) {
return OptionsResolver.isMorTable(conf)
- && conf.get(FlinkOptions.COMPACTION_SCHEDULE_ENABLED) &&
!isAppendMode(conf);
+ && areTableServicesEnabled(conf)
+ && conf.get(FlinkOptions.COMPACTION_SCHEDULE_ENABLED)
+ && !isAppendMode(conf);
}
/**
@@ -333,7 +335,7 @@ public class OptionsResolver {
* @param conf The flink configuration.
*/
public static boolean needsAsyncClustering(Configuration conf) {
- return isInsertOperation(conf) &&
conf.get(FlinkOptions.CLUSTERING_ASYNC_ENABLED);
+ return isInsertOperation(conf) && areTableServicesEnabled(conf) &&
conf.get(FlinkOptions.CLUSTERING_ASYNC_ENABLED);
}
/**
@@ -342,6 +344,9 @@ public class OptionsResolver {
* @param conf The flink configuration.
*/
public static boolean needsScheduleClustering(Configuration conf) {
+ if (!areTableServicesEnabled(conf)) {
+ return false;
+ }
if (!conf.get(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED)) {
return false;
}
@@ -548,6 +553,20 @@ public class OptionsResolver {
return
WriteConcurrencyMode.isNonBlockingConcurrencyControl(config.getString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue()));
}
+ /**
+ * Returns whether the cleaning for failed writes is enabled as lazy.
+ */
+ public static boolean isLazyFailedWritesCleaning(Configuration conf) {
+ return needsAsyncCleaning(conf) && isLazyFailedWritesCleanPolicy(conf);
+ }
+
+ /**
+ * Returns whether there is need for async cleaning (planning & execution).
+ */
+ public static boolean needsAsyncCleaning(Configuration conf) {
+ return areTableServicesEnabled(conf);
+ }
+
/**
* Returns whether Cleaner's failed writes policy is set to lazy
*/
@@ -578,6 +597,13 @@ public class OptionsResolver {
return (isCowTable(conf) || conf.get(FlinkOptions.CDC_ENABLED)) &&
isUpsertOperation(conf);
}
+ /**
+ * Returns whether table services are enabled.
+ */
+ public static boolean areTableServicesEnabled(Configuration conf) {
+ return conf.get(FlinkOptions.TABLE_SERVICES_ENABLED);
+ }
+
/**
* Returns the customized insert partitioner instance.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/v2/utils/PipelinesV2.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/v2/utils/PipelinesV2.java
index 6e93c02bc137..107e8673c42b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/v2/utils/PipelinesV2.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/v2/utils/PipelinesV2.java
@@ -119,7 +119,7 @@ public class PipelinesV2 {
DataStream<RowData> pipeline = Pipelines.append(conf, rowType,
dataStream);
if (OptionsResolver.needsAsyncClustering(conf)) {
return clusterV2(conf, rowType, pipeline);
- } else if (OptionsResolver.isLazyFailedWritesCleanPolicy(conf)) {
+ } else if (OptionsResolver.isLazyFailedWritesCleaning(conf)) {
// add clean function to rollback failed writes for lazy failed writes
cleaning policy
return cleanV2(conf, pipeline);
} else {
@@ -138,8 +138,10 @@ public class PipelinesV2 {
conf.set(FlinkOptions.COMPACTION_OPERATION_EXECUTE_ASYNC_ENABLED,
false);
}
return compactV2(conf, pipeline);
- } else {
+ } else if (OptionsResolver.needsAsyncCleaning(conf)) {
return cleanV2(conf, pipeline);
+ } else {
+ return pipeline;
}
}
@@ -156,7 +158,7 @@ public class PipelinesV2 {
if (OptionsResolver.isBulkInsertOperation(conf)) {
return conf.get(FlinkOptions.WRITE_TASKS);
} else if (OptionsResolver.isAppendMode(conf)) {
- return OptionsResolver.needsAsyncClustering(conf) ||
OptionsResolver.isLazyFailedWritesCleanPolicy(conf)
+ return OptionsResolver.needsAsyncClustering(conf) ||
OptionsResolver.isLazyFailedWritesCleaning(conf)
? 1 : conf.get(FlinkOptions.WRITE_TASKS);
} else {
return 1;
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index 721b3f94e470..58be515ca901 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -97,7 +97,7 @@ public class HoodieFlinkStreamer {
pipeline = Pipelines.append(conf, rowType, dataStream);
if (OptionsResolver.needsAsyncClustering(conf)) {
Pipelines.cluster(conf, rowType, pipeline);
- } else if (OptionsResolver.isLazyFailedWritesCleanPolicy(conf)) {
+ } else if (OptionsResolver.isLazyFailedWritesCleaning(conf)) {
// add clean function to rollback failed writes for lazy failed writes
cleaning policy
Pipelines.clean(conf, pipeline);
} else {
@@ -108,8 +108,10 @@ public class HoodieFlinkStreamer {
pipeline = Pipelines.hoodieStreamWrite(conf, rowType,
hoodieRecordDataStream);
if (OptionsResolver.needsAsyncCompaction(conf)) {
Pipelines.compact(conf, pipeline);
- } else {
+ } else if (OptionsResolver.needsAsyncCleaning(conf)) {
Pipelines.clean(conf, pipeline);
+ } else {
+ Pipelines.dummySink(pipeline);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index f46e2e672222..45bfde998af6 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -112,7 +112,7 @@ public class HoodieTableSink implements
DataStream<RowData> pipeline = Pipelines.append(conf, rowType,
dataStream);
if (OptionsResolver.needsAsyncClustering(conf)) {
return Pipelines.cluster(conf, rowType, pipeline);
- } else if (OptionsResolver.isLazyFailedWritesCleanPolicy(conf)) {
+ } else if (OptionsResolver.isLazyFailedWritesCleaning(conf)) {
// add clean function to rollback failed writes for lazy failed
writes cleaning policy
return Pipelines.clean(conf, pipeline);
} else {
@@ -131,8 +131,10 @@ public class HoodieTableSink implements
conf.set(FlinkOptions.COMPACTION_OPERATION_EXECUTE_ASYNC_ENABLED,
false);
}
return Pipelines.compact(conf, pipeline);
- } else {
+ } else if (OptionsResolver.needsAsyncCleaning(conf)) {
return Pipelines.clean(conf, pipeline);
+ } else {
+ return Pipelines.dummySink(pipeline);
}
};
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
index 6c61d5074432..f7266891e1de 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
@@ -19,6 +19,7 @@
package org.apache.hudi.configuration;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieCleanConfig;
@@ -123,4 +124,71 @@ public class TestOptionsResolver {
conf.set(FlinkOptions.PATH, tempFile.getAbsolutePath());
return conf;
}
+
+ @Test
+ void testAreTableServicesEnabled() {
+ Configuration conf = new Configuration();
+ // default value should be true
+ assertTrue(OptionsResolver.areTableServicesEnabled(conf));
+
+ // explicitly set to true
+ conf.set(FlinkOptions.TABLE_SERVICES_ENABLED, true);
+ assertTrue(OptionsResolver.areTableServicesEnabled(conf));
+
+ // explicitly set to false
+ conf.set(FlinkOptions.TABLE_SERVICES_ENABLED, false);
+ assertFalse(OptionsResolver.areTableServicesEnabled(conf));
+ }
+
+ @Test
+ void testTableServicesGateCompactionAndCleaning() {
+ Configuration conf = getConf();
+ conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
+ conf.setString(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
HoodieFailedWritesCleaningPolicy.LAZY.name());
+
+ assertTrue(OptionsResolver.needsAsyncCompaction(conf));
+ assertTrue(OptionsResolver.needsScheduleCompaction(conf));
+ assertTrue(OptionsResolver.needsAsyncCleaning(conf));
+ assertTrue(OptionsResolver.isLazyFailedWritesCleanPolicy(conf));
+ assertTrue(OptionsResolver.isLazyFailedWritesCleaning(conf));
+
+ conf.set(FlinkOptions.TABLE_SERVICES_ENABLED, false);
+
+ assertFalse(OptionsResolver.needsAsyncCompaction(conf));
+ assertFalse(OptionsResolver.needsScheduleCompaction(conf));
+ assertFalse(OptionsResolver.needsAsyncCleaning(conf));
+ assertTrue(OptionsResolver.isLazyFailedWritesCleanPolicy(conf));
+ assertFalse(OptionsResolver.isLazyFailedWritesCleaning(conf));
+ }
+
+ @Test
+ void testTableServicesGateMetadataCompaction() {
+ Configuration conf = getConf();
+ conf.set(FlinkOptions.METADATA_ENABLED, true);
+ conf.set(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.RECORD_LEVEL_INDEX.name());
+
+ assertTrue(OptionsResolver.needsAsyncMetadataCompaction(conf));
+ assertTrue(OptionsResolver.needsScheduleMdtCompaction(conf));
+
+ conf.set(FlinkOptions.TABLE_SERVICES_ENABLED, false);
+
+ assertFalse(OptionsResolver.needsAsyncMetadataCompaction(conf));
+ assertFalse(OptionsResolver.needsScheduleMdtCompaction(conf));
+ }
+
+ @Test
+ void testTableServicesGateClustering() {
+ Configuration conf = getConf();
+ conf.set(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
+ conf.set(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true);
+ conf.set(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
+
+ assertTrue(OptionsResolver.needsAsyncClustering(conf));
+ assertTrue(OptionsResolver.needsScheduleClustering(conf));
+
+ conf.set(FlinkOptions.TABLE_SERVICES_ENABLED, false);
+
+ assertFalse(OptionsResolver.needsAsyncClustering(conf));
+ assertFalse(OptionsResolver.needsScheduleClustering(conf));
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index a4e869428c24..69087122a2e8 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -359,6 +359,35 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
.end();
}
+ @Test
+ public void testInsertWithTableServiceDisabled() throws Exception {
+ // reset the config option
+ conf.set(FlinkOptions.OPERATION, "insert");
+ conf.set(FlinkOptions.TABLE_SERVICES_ENABLED, false);
+ conf.set(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
+ conf.set(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true);
+ conf.set(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
+
+ preparePipeline(conf)
+ .consume(TestData.DATA_SET_INSERT_SAME_KEY)
+ .checkpoint(1)
+ .handleEvents(1)
+ .checkpointComplete(1)
+ .checkWrittenData(EXPECTED4, 1)
+ // insert duplicates again
+ .consume(TestData.DATA_SET_INSERT_SAME_KEY)
+ .checkpoint(2)
+ .handleEvents(1)
+ .checkpointComplete(2)
+ .checkWrittenDataCOW(EXPECTED5)
+ .end();
+ HoodieFlinkWriteClient writeClient =
FlinkWriteClients.createWriteClient(conf);
+ long completedReplaceCommit =
writeClient.getHoodieTable().getActiveTimeline().getCompletedReplaceTimeline().getInstants().stream().count();
+ long pendingReplaceCommit =
writeClient.getHoodieTable().getActiveTimeline().filterPendingReplaceTimeline().getInstants().stream().count();
+ assertEquals(0, completedReplaceCommit);
+ assertEquals(0, pendingReplaceCommit);
+ }
+
@Test
public void testUpsert() throws Exception {
// open the function and ingest data
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index 7515f8904327..3f24c2763faf 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
@@ -42,7 +43,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
import static org.apache.hudi.utils.TestData.insertRow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
/**
@@ -56,6 +59,34 @@ public class TestWriteMergeOnReadWithCompact extends
TestWriteCopyOnWrite {
conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
}
+ @Test
+ public void testUpsertWithTableServiceDisabled() throws Exception {
+ // reset the config option
+ conf.set(FlinkOptions.TABLE_SERVICES_ENABLED, false);
+ conf.set(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, true);
+ conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+ conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+
+ preparePipeline(conf)
+ .consume(TestData.DATA_SET_INSERT)
+ .assertEmptyDataFiles()
+ .checkpoint(1)
+ .handleEvents(1)
+ .checkpointComplete(1)
+ .consume(TestData.DATA_SET_INSERT)
+ .checkpoint(2)
+ .handleEvents(1)
+ .checkpointComplete(2)
+ .end();
+ HoodieFlinkWriteClient writeClient =
FlinkWriteClients.createWriteClient(conf);
+ long completedCompaction =
writeClient.getHoodieTable().getActiveTimeline().getInstants().stream()
+ .filter(s -> s.getAction().equals(COMPACTION_ACTION))
+ .filter(HoodieInstant::isCompleted).count();
+ long pendingCompaction =
writeClient.getHoodieTable().getActiveTimeline().filterPendingCompactionTimeline().getInstants().stream().count();
+ assertEquals(0, completedCompaction);
+ assertEquals(0, pendingCompaction);
+ }
+
@Test
public void testPartialFailover() {
// partial failover is only valid for append mode.