This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c94c9a393c7 Fix idempotent table cache update handling (#17959)
c94c9a393c7 is described below
commit c94c9a393c7cd20baad5bb3281553eb2bd175f31
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 17 09:05:40 2026 +0800
Fix idempotent table cache update handling (#17959)
---
.../db/schemaengine/table/DataNodeTableCache.java | 37 ++++++++++++++---
.../schemaengine/table/DataNodeTableCacheTest.java | 48 ++++++++++++++++++++++
2 files changed, 79 insertions(+), 6 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
index d8d8f4fe19b..11e80f5a63e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
@@ -179,7 +179,14 @@ public class DataNodeTableCache implements ITableCache {
// If rename table
if (Objects.nonNull(oldName)) {
// Equals to commit update
- final TsTable oldTable =
preUpdateTableMap.get(database).get(oldName).getLeft();
+ final TsTable oldTable = getTableFromPreUpdateMap(database, oldName);
+ if (Objects.isNull(oldTable)) {
+ LOGGER.info(
+ "Skip rollback renaming old table {}.{} because it has been
handled.",
+ database,
+ oldName);
+ return;
+ }
// Cannot be rolled back, consider:
// 1. Fetched a written CN table
// 2. CN rollback because of timeout
@@ -200,24 +207,42 @@ public class DataNodeTableCache implements ITableCache {
}
private void removeTableFromPreUpdateMap(final String database, final String
tableName) {
- preUpdateTableMap.compute(
+ preUpdateTableMap.computeIfPresent(
database,
(k, v) -> {
- if (v == null) {
- throw new IllegalStateException();
+ final Pair<TsTable, Long> tableVersionPair = v.get(tableName);
+ if (Objects.nonNull(tableVersionPair)) {
+ tableVersionPair.setLeft(null);
}
- v.get(tableName).setLeft(null);
return v;
});
}
+ private @Nullable TsTable getTableFromPreUpdateMap(
+ final String database, final String tableName) {
+ final Map<String, Pair<TsTable, Long>> tableMap =
preUpdateTableMap.get(database);
+ if (Objects.isNull(tableMap)) {
+ return null;
+ }
+ final Pair<TsTable, Long> tableVersionPair = tableMap.get(tableName);
+ return Objects.nonNull(tableVersionPair) ? tableVersionPair.getLeft() :
null;
+ }
+
@Override
public void commitUpdateTable(
String database, final String tableName, final @Nullable String oldName)
{
database = PathUtils.unQualifyDatabaseName(database);
readWriteLock.writeLock().lock();
try {
- final TsTable newTable =
preUpdateTableMap.get(database).get(tableName).getLeft();
+ final TsTable newTable = getTableFromPreUpdateMap(database, tableName);
+ if (Objects.isNull(newTable)) {
+ LOGGER.info(
+ "Skip commit-update table {}.{} because it has been handled.",
database, tableName);
+ if (Objects.nonNull(oldName)) {
+ removeTableFromPreUpdateMap(database, oldName);
+ }
+ return;
+ }
// Cannot be committed, consider:
// 1. Fetched a non-changed CN table
// 2. CN is changed
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java
index 2d6114363a5..4b33991e3d7 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java
@@ -19,6 +19,12 @@
package org.apache.iotdb.db.schemaengine.table;
+import org.apache.iotdb.commons.schema.table.TsTable;
+import org.apache.iotdb.commons.schema.table.column.FieldColumnSchema;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.junit.Assert;
import org.junit.Test;
@@ -28,6 +34,8 @@ import java.util.concurrent.Semaphore;
public class DataNodeTableCacheTest {
private static final String DATABASE = "interrupted_fetch_database";
+ private static final String TABLE_CACHE_TEST_DATABASE =
"root.table_cache_test";
+ private static final String TABLE_NAME = "table1";
@Test
public void interruptedFetchDoesNotLeakSemaphorePermit() throws Exception {
@@ -50,9 +58,49 @@ public class DataNodeTableCacheTest {
}
}
+ @Test
+ public void commitUpdateTableIsIdempotent() {
+ final DataNodeTableCache cache = DataNodeTableCache.getInstance();
+ cache.invalid(TABLE_CACHE_TEST_DATABASE);
+ try {
+ cache.preUpdateTable(TABLE_CACHE_TEST_DATABASE, createTable(TABLE_NAME),
null);
+
+ cache.commitUpdateTable(TABLE_CACHE_TEST_DATABASE, TABLE_NAME, null);
+ cache.commitUpdateTable(TABLE_CACHE_TEST_DATABASE, TABLE_NAME, null);
+
+ Assert.assertEquals(
+ TABLE_NAME, cache.getTable(TABLE_CACHE_TEST_DATABASE,
TABLE_NAME).getTableName());
+ } finally {
+ cache.invalid(TABLE_CACHE_TEST_DATABASE);
+ }
+ }
+
+ @Test
+ public void commitAfterRollbackUpdateTableIsIgnored() {
+ final DataNodeTableCache cache = DataNodeTableCache.getInstance();
+ cache.invalid(TABLE_CACHE_TEST_DATABASE);
+ try {
+ cache.preUpdateTable(TABLE_CACHE_TEST_DATABASE, createTable(TABLE_NAME),
null);
+
+ cache.rollbackUpdateTable(TABLE_CACHE_TEST_DATABASE, TABLE_NAME, null);
+ cache.commitUpdateTable(TABLE_CACHE_TEST_DATABASE, TABLE_NAME, null);
+
+ Assert.assertNull(cache.getTable(TABLE_CACHE_TEST_DATABASE, TABLE_NAME,
false));
+ } finally {
+ cache.invalid(TABLE_CACHE_TEST_DATABASE);
+ }
+ }
+
private Semaphore getFetchTableSemaphore(final DataNodeTableCache cache)
throws Exception {
final Field field =
DataNodeTableCache.class.getDeclaredField("fetchTableSemaphore");
field.setAccessible(true);
return (Semaphore) field.get(cache);
}
+
+ private TsTable createTable(final String tableName) {
+ final TsTable table = new TsTable(tableName);
+ table.addColumnSchema(
+ new FieldColumnSchema("s1", TSDataType.INT32, TSEncoding.RLE,
CompressionType.GZIP));
+ return table;
+ }
}