This is an automated email from the ASF dual-hosted git repository.
jmclean pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new bb419f8d09 [#10602] improvemet:check update count in
FilesetMetaService.updateFileset (#10613)
bb419f8d09 is described below
commit bb419f8d092dce23b7fc6732c25f63470ac19d4c
Author: Victory Tuduo <[email protected]>
AuthorDate: Thu Apr 2 05:37:13 2026 +0100
[#10602] improvemet:check update count in FilesetMetaService.updateFileset
(#10613)
1. Title: [#10602] fix(core): Check update count in
FilesetMetaService.updateFileset
### What changes were proposed in this pull request?
This PR fixes a bug in `FilesetMetaService.updateFileset` where a
potentially unsafe success path occurs when `checkNeedUpdateVersion`
evaluates to true. We now explicitly capture the row count result from
`updateFilesetMeta(...)` by changing the underlying session utility
logic to `getWithoutCommit` instead of a void execution. If the updated
row count is 0, we properly treat it as an update conflict and fail the
operation.
### Why are the changes needed?
Because `updateFilesetMeta(...)` relies on optimistic matching on old
row values, it could previously return 0 if the metadata row changed
between reads and updates. However, it would force an unconditional
success without checking this row count. This could lead to data
inconsistencies and masked write failures where operations reported
success despite the fileset metadata update not being applied.
Fix: #(10602)
### Does this PR introduce _any_ user-facing change?
No user-facing APIs were altered.
### How was this patch tested?
A new unit test
`testUpdateFilesetReturnsSuccessWhenVersionedMetaUpdateAffectsNoRows`
was added to `TestFilesetMetaService.java` to simulate locking conflict
anomalies and assert that a proper runtime IO error exception is thrown
instead of masking it as a silent success.
---
.../relational/service/FilesetMetaService.java | 48 +++++++++-----
.../relational/service/TestFilesetMetaService.java | 76 ++++++++++++++++++++++
2 files changed, 108 insertions(+), 16 deletions(-)
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
index 97e7304ae8..118b9f0cbf 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
@@ -215,22 +215,38 @@ public class FilesetMetaService {
FilesetPO newFilesetPO =
POConverters.updateFilesetPOWithVersion(oldFilesetPO, newEntity,
checkNeedUpdateVersion);
if (checkNeedUpdateVersion) {
- // These operations are guaranteed to be atomic by the transaction. If
version info is
- // inserted successfully and the uniqueness is guaranteed by
`fileset_id + version +
- // deleted_at`, it means that no other transaction has been inserted
(if a uniqueness
- // conflict occurs, the transaction will be rolled back), then we can
consider that the
- // fileset meta update is successful
- SessionUtils.doMultipleWithCommit(
- () ->
- SessionUtils.doWithoutCommit(
- FilesetVersionMapper.class,
- mapper ->
mapper.insertFilesetVersions(newFilesetPO.getFilesetVersionPOs())),
- () ->
- SessionUtils.doWithoutCommit(
- FilesetMetaMapper.class,
- mapper -> mapper.updateFilesetMeta(newFilesetPO,
oldFilesetPO)));
- // we set the updateResult to 1 to indicate that the update is
successful
- updateResult = 1;
+ // These operations are performed atomically within a single
transaction. The version
+ // insert is protected by a unique constraint on `fileset_id + version
+ deleted_at`. If
+ // the meta update affects 0 rows (concurrent modification), the
transaction is rolled
+ // back — including the version insert — and the update is treated as
a conflict.
+ int[] metaUpdateCountRef = new int[1];
+ try {
+ SessionUtils.doMultipleWithCommit(
+ () ->
+ SessionUtils.doWithoutCommit(
+ FilesetVersionMapper.class,
+ mapper ->
mapper.insertFilesetVersions(newFilesetPO.getFilesetVersionPOs())),
+ () -> {
+ metaUpdateCountRef[0] =
+ SessionUtils.getWithoutCommit(
+ FilesetMetaMapper.class,
+ mapper -> mapper.updateFilesetMeta(newFilesetPO,
oldFilesetPO));
+ if (metaUpdateCountRef[0] == 0) {
+ throw new RuntimeException("Failed to update the entity: " +
identifier);
+ }
+ });
+ updateResult = 1;
+ } catch (RuntimeException re) {
+ if (metaUpdateCountRef[0] == 0) {
+ // The meta update matched no rows; the transaction was rolled
back,
+ // including the version insert above.
+ throw new IOException("Failed to update the entity: " +
identifier);
+ } else {
+ ExceptionUtils.checkSQLException(
+ re, Entity.EntityType.FILESET,
newEntity.nameIdentifier().toString());
+ throw re;
+ }
+ }
} else {
updateResult =
SessionUtils.doWithCommitAndFetchResult(
diff --git
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestFilesetMetaService.java
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestFilesetMetaService.java
index c292418c1e..24faf760e5 100644
---
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestFilesetMetaService.java
+++
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestFilesetMetaService.java
@@ -376,4 +376,80 @@ public class TestFilesetMetaService extends
TestJDBCBackend {
.withAuditInfo(auditInfo)
.build();
}
+
+ @TestTemplate
+ public void
testUpdateFilesetReturnsSuccessWhenVersionedMetaUpdateAffectsNoRows()
+ throws IOException {
+ String filesetName = GravitinoITUtils.genRandomName("tst_fs_conflict");
+ NameIdentifier filesetIdent =
+ NameIdentifier.of(metalakeName, catalogName, schemaName, filesetName);
+ FilesetEntity filesetEntity =
+ createFilesetEntity(
+ RandomIdGenerator.INSTANCE.nextId(),
+ NamespaceUtil.ofFileset(metalakeName, catalogName, schemaName),
+ filesetName,
+ AUDIT_INFO,
+ "/tmp");
+ FilesetMetaService.getInstance().insertFileset(filesetEntity, true);
+
+ AuditInfo conflictingAuditInfo =
+ AuditInfo.builder()
+ .withCreator("conflicting-updater")
+ .withCreateTime(Instant.now())
+ .build();
+ FilesetEntity updatedFilesetEntity =
+ FilesetEntity.builder()
+ .withId(filesetEntity.id())
+ .withName(filesetEntity.name())
+ .withNamespace(filesetEntity.namespace())
+ .withFilesetType(filesetEntity.filesetType())
+ .withStorageLocations(ImmutableMap.of(LOCATION_NAME_UNKNOWN,
"/tmp-v2"))
+ .withComment("comment-v2")
+ .withProperties(ImmutableMap.of("version", "2"))
+ .withAuditInfo(
+ AuditInfo.builder()
+ .withCreator("expected-updater")
+ .withCreateTime(Instant.now())
+ .build())
+ .build();
+
+ Exception exception =
+ Assertions.assertThrows(
+ IOException.class,
+ () ->
+ FilesetMetaService.getInstance()
+ .updateFileset(
+ filesetIdent,
+ e -> {
+ // Simulate an optimistic locking conflict
+ try {
+ backend.update(
+ filesetIdent,
+ Entity.EntityType.FILESET,
+ entity -> {
+ FilesetEntity cloned =
+ createFilesetEntity(
+ entity.id(),
+ entity.namespace(),
+ entity.name(),
+ conflictingAuditInfo,
+ "/tmp");
+ return cloned;
+ });
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ return updatedFilesetEntity;
+ }));
+ Assertions.assertTrue(
+ exception.getMessage().contains("Failed to update the entity: " +
filesetIdent));
+
+ FilesetEntity persistedEntity =
+ FilesetMetaService.getInstance().getFilesetByIdentifier(filesetIdent);
+ Assertions.assertEquals(conflictingAuditInfo, persistedEntity.auditInfo());
+ Assertions.assertEquals("", persistedEntity.comment());
+ Assertions.assertNull(persistedEntity.properties());
+ Assertions.assertEquals("/tmp",
persistedEntity.storageLocations().get(LOCATION_NAME_UNKNOWN));
+ Assertions.assertNotEquals(updatedFilesetEntity, persistedEntity);
+ }
}