This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e5f8841a5c [core] Introduce 'deletion-vectors.modifiable' to limit dv
alter (#6666)
e5f8841a5c is described below
commit e5f8841a5c00a068b9f8757b91bee82090f7b8bb
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Nov 25 16:47:20 2025 +0800
[core] Introduce 'deletion-vectors.modifiable' to limit dv alter (#6666)
---
.../shortcodes/generated/core_configuration.html | 6 +++
.../main/java/org/apache/paimon/CoreOptions.java | 6 +++
.../org/apache/paimon/schema/SchemaManager.java | 32 +++++++++++++++-
.../paimon/table/AbstractFileStoreTable.java | 2 +-
.../apache/paimon/schema/SchemaManagerTest.java | 43 ++++++++++++++++++++++
.../apache/paimon/table/SimpleTableTestBase.java | 22 ++++++-----
.../paimon/flink/AbstractFlinkTableFactory.java | 3 +-
.../apache/paimon/flink/BatchFileStoreITCase.java | 4 +-
.../apache/paimon/flink/DeletionVectorITCase.java | 2 +
9 files changed, 105 insertions(+), 15 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 5b0b021eaa..c6101e0a85 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -428,6 +428,12 @@ under the License.
<td>Boolean</td>
<td>Whether to enable deletion vectors mode. In this mode, index
files containing deletion vectors are generated when data is written, which
marks the data for deletion. During read operations, by applying these index
files, merging can be avoided.</td>
</tr>
+ <tr>
+ <td><h5>deletion-vectors.modifiable</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to enable modifying deletion vectors mode.</td>
+ </tr>
<tr>
<td><h5>disable-explicit-type-casting</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 9cf4056001..fa5b133a3c 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1709,6 +1709,12 @@ public class CoreOptions implements Serializable {
+ " vectors are generated when data is
written, which marks the data for deletion."
+ " During read operations, by applying
these index files, merging can be avoided.");
+ public static final ConfigOption<Boolean> DELETION_VECTORS_MODIFIABLE =
+ key("deletion-vectors.modifiable")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to enable modifying deletion
vectors mode.");
+
public static final ConfigOption<MemorySize>
DELETION_VECTOR_INDEX_FILE_TARGET_SIZE =
key("deletion-vector.index-file.target-size")
.memoryType()
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index cc8df58b2a..82b605204b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -78,6 +78,8 @@ import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.AGG_FUNCTION;
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
+import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
+import static org.apache.paimon.CoreOptions.DELETION_VECTORS_MODIFIABLE;
import static org.apache.paimon.CoreOptions.DISTINCT;
import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
import static org.apache.paimon.CoreOptions.IGNORE_RETRACT;
@@ -297,7 +299,10 @@ public class SchemaManager implements Serializable {
SetOption setOption = (SetOption) change;
if (hasSnapshots.get()) {
checkAlterTableOption(
- setOption.key(), oldOptions.get(setOption.key()),
setOption.value());
+ oldOptions,
+ setOption.key(),
+ oldOptions.get(setOption.key()),
+ setOption.value());
}
newOptions.put(setOption.key(), setOption.value());
} else if (change instanceof RemoveOption) {
@@ -1072,7 +1077,7 @@ public class SchemaManager implements Serializable {
}
public static void checkAlterTableOption(
- String key, @Nullable String oldValue, String newValue) {
+ Map<String, String> options, String key, @Nullable String
oldValue, String newValue) {
if (CoreOptions.IMMUTABLE_OPTIONS.contains(key)) {
throw new UnsupportedOperationException(
String.format("Change '%s' is not supported yet.", key));
@@ -1092,6 +1097,29 @@ public class SchemaManager implements Serializable {
throw new UnsupportedOperationException("Cannot change bucket
to -1.");
}
}
+
+ if (DELETION_VECTORS_ENABLED.key().equals(key)) {
+ boolean dvModifiable =
+ Boolean.parseBoolean(
+ options.getOrDefault(
+ DELETION_VECTORS_MODIFIABLE.key(),
+
DELETION_VECTORS_MODIFIABLE.defaultValue().toString()));
+ if (!dvModifiable) {
+ boolean oldDv =
+ oldValue == null
+ ? DELETION_VECTORS_ENABLED.defaultValue()
+ : Boolean.parseBoolean(oldValue);
+ boolean newDv = Boolean.parseBoolean(newValue);
+
+ if (oldDv != newDv) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Cannot change deletion vectors mode from
%s to %s. If modifying table deletion-vectors mode without full-compaction,
this may result in data duplication. "
+ + "If you are confident, you can
set table option '%s' = 'true' to allow deletion vectors modification.",
+ oldDv, newDv,
DELETION_VECTORS_MODIFIABLE.key()));
+ }
+ }
+ }
}
public static void checkResetTableOption(String key) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index ea8fff0c85..e1ec6ff6c4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -318,7 +318,7 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
(k, newValue) -> {
String oldValue = oldOptions.get(k);
if (!Objects.equals(oldValue, newValue)) {
- SchemaManager.checkAlterTableOption(k, oldValue,
newValue);
+ SchemaManager.checkAlterTableOption(oldOptions, k,
oldValue, newValue);
}
});
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index 5cc68394ca..bc433ff18e 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -69,6 +69,8 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
+import static org.apache.paimon.CoreOptions.DELETION_VECTORS_MODIFIABLE;
import static org.apache.paimon.utils.FailingFileIO.retryArtificialException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
@@ -748,4 +750,45 @@ public class SchemaManagerTest {
1, "v", new ArrayType(new
MapType(DataTypes.INT(), innerType))));
assertThat(manager.latest().get().logicalRowType()).isEqualTo(outerType);
}
+
+ @Test
+ public void testAlterDeletionVectorsMode() throws Exception {
+ // create table
+ Schema schema =
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ options,
+ "");
+ Path tableRoot = new Path(tempDir.toString(), "table");
+ SchemaManager manager = new SchemaManager(LocalFileIO.create(),
tableRoot);
+ manager.createTable(schema);
+
+ // write table
+ FileStoreTable table =
FileStoreTableFactory.create(LocalFileIO.create(), tableRoot);
+ String commitUser = UUID.randomUUID().toString();
+ TableWriteImpl<?> write =
+
table.newWrite(commitUser).withIOManager(IOManager.create(tempDir + "/io"));
+ TableCommitImpl commit = table.newCommit(commitUser);
+ write.write(GenericRow.of(1, 10L, BinaryString.fromString("apple")));
+ commit.commit(1, write.prepareCommit(false, 1));
+ write.close();
+ commit.close();
+
+ // assert exception in alter table
+ assertThatThrownBy(
+ () ->
+ manager.commitChanges(
+ SchemaChange.setOption(
+
DELETION_VECTORS_ENABLED.key(), "true")))
+ .hasMessageContaining(
+ "If modifying table deletion-vectors mode without
full-compaction, this may result in data duplication.");
+
+ // assert not exception when set option
+
manager.commitChanges(SchemaChange.setOption(DELETION_VECTORS_MODIFIABLE.key(),
"true"));
+
manager.commitChanges(SchemaChange.setOption(DELETION_VECTORS_ENABLED.key(),
"true"));
+ table = FileStoreTableFactory.create(LocalFileIO.create(), tableRoot);
+
assertThat(table.options().get(DELETION_VECTORS_ENABLED.key())).isEqualTo("true");
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
index 81a613ac44..69cf5441f5 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
@@ -1592,11 +1592,12 @@ public abstract class SimpleTableTestBase {
@Test
public void testDataSplitNotIncludeDvFilesWhenStreamingRead() throws
Exception {
- FileStoreTable table = createFileStoreTable();
- Map<String, String> options = new HashMap<>();
- options.put(DELETION_VECTORS_ENABLED.key(), "true");
- options.put(WRITE_ONLY.key(), "true");
- table = table.copy(options);
+ FileStoreTable table =
+ createFileStoreTable(
+ options -> {
+ options.set(DELETION_VECTORS_ENABLED, true);
+ options.set(WRITE_ONLY, true);
+ });
try (StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser)) {
@@ -1617,11 +1618,12 @@ public abstract class SimpleTableTestBase {
@Test
public void testDataSplitNotIncludeDvFilesWhenStreamingReadChanges()
throws Exception {
- FileStoreTable table = createFileStoreTable();
- Map<String, String> options = new HashMap<>();
- options.put(DELETION_VECTORS_ENABLED.key(), "true");
- options.put(WRITE_ONLY.key(), "true");
- table = table.copy(options);
+ FileStoreTable table =
+ createFileStoreTable(
+ options -> {
+ options.set(DELETION_VECTORS_ENABLED, true);
+ options.set(WRITE_ONLY, true);
+ });
try (StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser)) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index 136e94f08d..d623b7e933 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -206,7 +206,8 @@ public abstract class AbstractFlinkTableFactory
(key, newValue) -> {
String oldValue = origin.getOptions().get(key);
if (!Objects.equals(oldValue, newValue)) {
- SchemaManager.checkAlterTableOption(key, oldValue,
newValue);
+ SchemaManager.checkAlterTableOption(
+ origin.getOptions(), key, oldValue, newValue);
}
});
Map<String, String> newOptions = new HashMap<>();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index c5e394ca8b..589eeb14af 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -456,10 +456,12 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
Map<String, String> expireOptions = new HashMap<>();
expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "1");
expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
- FileStoreTable table = (FileStoreTable) paimonTable(tableName);
+ FileStoreTable table = paimonTable(tableName);
table.copy(expireOptions).newCommit("").expireSnapshots();
assertThat(table.snapshotManager().snapshotCount()).isEqualTo(1);
+ sql("ALTER TABLE T SET('deletion-vectors.modifiable' = 'true')");
+
assertThat(
batchSql(
String.format(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
index 32f07806ca..9f24ee548d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
@@ -378,6 +378,7 @@ public class DeletionVectorITCase extends CatalogITCaseBase
{
tEnv.getConfig().set("table.dml-sync", "true");
sql("CALL sys.compact(`table` => 'default.T')");
// disable dv and select
+ sql("ALTER TABLE T SET('deletion-vectors.modifiable' = 'true')");
sql("ALTER TABLE T SET('deletion-vectors.enabled' = 'false')");
assertThat(sql("SELECT * FROM T").size()).isEqualTo(3);
@@ -399,6 +400,7 @@ public class DeletionVectorITCase extends CatalogITCaseBase
{
// full compact
sql("CALL sys.compact(`table` => 'default.TT')");
// disable dv and select
+ sql("ALTER TABLE TT SET('deletion-vectors.modifiable' = 'true')");
sql("ALTER TABLE TT SET('deletion-vectors.enabled' = 'false')");
assertThat(sql("SELECT * FROM TT").size()).isEqualTo(5);
}