This is an automated email from the ASF dual-hosted git repository.

CRZbulabula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new cc108e78ec8 Fixed TTL problems (#17735)
cc108e78ec8 is described below

commit cc108e78ec81b2b7d003305b88218fa3327f581e
Author: Caideyipi <[email protected]>
AuthorDate: Sun May 24 14:00:46 2026 +0800

    Fixed TTL problems (#17735)
---
 .../iotdb/confignode/manager/TTLManager.java       |   4 +
 .../iotdb/confignode/persistence/TTLInfo.java      |  33 ++-
 .../procedure/impl/schema/SetTTLProcedure.java     | 244 ++++++++++++---
 .../procedure/state/schema/SetTTLState.java        |   4 +-
 .../iotdb/confignode/persistence/TTLInfoTest.java  |  68 ++++-
 .../procedure/impl/schema/SetTTLProcedureTest.java | 327 +++++++++++++++++++++
 6 files changed, 637 insertions(+), 43 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TTLManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TTLManager.java
index dc6ae4f37a1..284b77004be 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TTLManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TTLManager.java
@@ -127,6 +127,10 @@ public class TTLManager {
     return ((ShowTTLResp) showTTL(new ShowTTLPlan())).getPathTTLMap();
   }
 
+  public long getTTL(final String[] pathPattern) {
+    return ttlInfo.getTTL(pathPattern);
+  }
+
   public int getTTLCount() {
     return ttlInfo.getTTLCount();
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java
index 7b98ebba50b..4921b03851c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java
@@ -70,13 +70,15 @@ public class TTLInfo implements SnapshotProcessor {
     try {
       // check ttl rule capacity
       final int tTlRuleCapacity = 
CommonDescriptor.getInstance().getConfig().getTTlRuleCapacity();
-      if (getTTLCount() >= tTlRuleCapacity) {
+      final int newTTLRuleCount = calculateNewTTLRuleCount(plan);
+      final int requestedTTLRuleCount = ttlCache.getTtlCount() + 
newTTLRuleCount;
+      if (newTTLRuleCount > 0 && requestedTTLRuleCount > tTlRuleCapacity) {
         TSStatus errorStatus = new 
TSStatus(TSStatusCode.OVERSIZE_TTL.getStatusCode());
         errorStatus.setMessage(
             String.format(
-                "The number of TTL rules has reached the limit (%d). Please 
delete "
-                    + "some existing rules first.",
-                tTlRuleCapacity));
+                "The number of TTL rules has reached the limit "
+                    + "(capacity: %d, requested total: %d). Please delete some 
existing rules first.",
+                tTlRuleCapacity, requestedTTLRuleCount));
         return errorStatus;
       }
       ttlCache.setTTL(plan.getPathPattern(), plan.getTTL());
@@ -92,6 +94,20 @@ public class TTLInfo implements SnapshotProcessor {
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
   }
 
+  private int calculateNewTTLRuleCount(SetTTLPlan plan) {
+    int newTTLRuleCount = isNewTTLRule(plan.getPathPattern()) ? 1 : 0;
+    if (plan.isDataBase()) {
+      String[] pathNodes = Arrays.copyOf(plan.getPathPattern(), 
plan.getPathPattern().length + 1);
+      pathNodes[pathNodes.length - 1] = 
IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+      newTTLRuleCount += isNewTTLRule(pathNodes) ? 1 : 0;
+    }
+    return newTTLRuleCount;
+  }
+
+  private boolean isNewTTLRule(String[] pathNodes) {
+    return ttlCache.getLastNodeTTL(pathNodes) == TTLCache.NULL_TTL;
+  }
+
   /** Only used for upgrading from database level ttl to device level ttl. */
   public void setTTL(Map<String, Long> databaseTTLMap) throws 
IllegalPathException {
     lock.writeLock().lock();
@@ -159,6 +175,15 @@ public class TTLInfo implements SnapshotProcessor {
     }
   }
 
+  public long getTTL(final String[] pathPattern) {
+    lock.readLock().lock();
+    try {
+      return ttlCache.getLastNodeTTL(pathPattern);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
   /**
    * Get the maximum ttl of the corresponding database level.
    *
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
index b90f2df87d5..dca79a02366 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
@@ -22,8 +22,10 @@ package org.apache.iotdb.confignode.procedure.impl.schema;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.schema.ttl.TTLCache;
 import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
 import 
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
 import 
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
@@ -47,14 +49,21 @@ import org.slf4j.LoggerFactory;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 
 public class SetTTLProcedure extends 
StateMachineProcedure<ConfigNodeProcedureEnv, SetTTLState> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SetTTLProcedure.class);
+  // Distinguishes no previous TTL from TTLCache.NULL_TTL, the explicit unset 
marker for rollback.
+  private static final long TTL_NOT_EXIST = Long.MIN_VALUE;
+  private static final int ROLLBACK_STATE_BYTES = Byte.BYTES + Long.BYTES * 2;
 
   private SetTTLPlan plan;
+  private long previousTTL = TTL_NOT_EXIST;
+  private long previousDatabaseWildcardTTL = TTL_NOT_EXIST;
+  private boolean previousTTLStateCaptured = false;
 
   public SetTTLProcedure(final boolean isGeneratedByPipe) {
     super(isGeneratedByPipe);
@@ -71,6 +80,10 @@ public class SetTTLProcedure extends 
StateMachineProcedure<ConfigNodeProcedureEn
     long startTime = System.currentTimeMillis();
     try {
       switch (state) {
+        case CAPTURE_PREVIOUS_TTL:
+          capturePreviousTTLState(env);
+          setNextState(SetTTLState.SET_CONFIGNODE_TTL);
+          return Flow.HAS_MORE_STATE;
         case SET_CONFIGNODE_TTL:
           setConfigNodeTTL(env);
           return Flow.HAS_MORE_STATE;
@@ -86,18 +99,13 @@ public class SetTTLProcedure extends 
StateMachineProcedure<ConfigNodeProcedureEn
     }
   }
 
-  private void setConfigNodeTTL(ConfigNodeProcedureEnv env) {
-    TSStatus res;
-    try {
-      res =
-          env.getConfigManager()
-              .getConsensusManager()
-              .write(isGeneratedByPipe ? new PipeEnrichedPlan(this.plan) : 
this.plan);
-    } catch (ConsensusException e) {
-      
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
 e);
-      res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
-      res.setMessage(e.getMessage());
+  void setConfigNodeTTL(final ConfigNodeProcedureEnv env) {
+    if (!previousTTLStateCaptured) {
+      capturePreviousTTLState(env);
+      setNextState(SetTTLState.SET_CONFIGNODE_TTL);
+      return;
     }
+    final TSStatus res = writeConfigNodePlan(env, plan);
     if (res.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       LOGGER.info(ProcedureMessages.FAILED_TO_EXECUTE_PLAN_BECAUSE, plan, 
res.message);
       setFailure(new ProcedureException(new IoTDBException(res)));
@@ -106,35 +114,177 @@ public class SetTTLProcedure extends 
StateMachineProcedure<ConfigNodeProcedureEn
     }
   }
 
-  private void updateDataNodeTTL(ConfigNodeProcedureEnv env) {
-    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+  void updateDataNodeTTL(final ConfigNodeProcedureEnv env) {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
-    DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
-        new DataNodeAsyncRequestContext<>(
-            CnToDnAsyncRequestType.SET_TTL,
-            new TSetTTLReq(
-                Collections.singletonList(String.join(".", 
plan.getPathPattern())),
-                plan.getTTL(),
-                plan.isDataBase()),
-            dataNodeLocationMap);
+    final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+        sendTTLRequest(
+            dataNodeLocationMap,
+            buildSetTTLReq(plan.getPathPattern(), plan.getTTL(), 
plan.isDataBase()));
+    if (hasFailedDataNode(clientHandler)) {
+      LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
+      setFailure(
+          new ProcedureException(
+              new 
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
+    }
+  }
+
+  private void capturePreviousTTLState(final ConfigNodeProcedureEnv env) {
+    if (previousTTLStateCaptured) {
+      return;
+    }
+    previousTTL = getTTLOrDefault(env, plan.getPathPattern());
+    if (plan.isDataBase()) {
+      previousDatabaseWildcardTTL =
+          getTTLOrDefault(env, 
getDatabaseWildcardPathPattern(plan.getPathPattern()));
+    }
+    previousTTLStateCaptured = true;
+  }
+
+  TSStatus writeConfigNodePlan(final ConfigNodeProcedureEnv env, final 
SetTTLPlan setTTLPlan) {
+    try {
+      return env.getConfigManager()
+          .getConsensusManager()
+          .write(isGeneratedByPipe ? new PipeEnrichedPlan(setTTLPlan) : 
setTTLPlan);
+    } catch (ConsensusException e) {
+      
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
 e);
+      final TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      return res;
+    }
+  }
+
+  DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> sendTTLRequest(
+      final Map<Integer, TDataNodeLocation> dataNodeLocationMap, final 
TSetTTLReq req) {
+    final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+        new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SET_TTL, req, 
dataNodeLocationMap);
     
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
-    Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
-    for (TSStatus status : statusMap.values()) {
-      // all dataNodes must clear the related schemaengine cache
+    return clientHandler;
+  }
+
+  private TSetTTLReq buildSetTTLReq(
+      final String[] pathPattern, final long ttl, final boolean isDataBase) {
+    return new TSetTTLReq(
+        Collections.singletonList(String.join(".", pathPattern)), ttl, 
isDataBase);
+  }
+
+  private boolean hasFailedDataNode(
+      final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler) {
+    if (!clientHandler.getRequestIndices().isEmpty()) {
+      return true;
+    }
+    for (TSStatus status : clientHandler.getResponseMap().values()) {
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
-        setFailure(
-            new ProcedureException(
-                new 
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
-        return;
+        return true;
       }
     }
+    return false;
   }
 
+  private long getTTLOrDefault(final ConfigNodeProcedureEnv env, final 
String[] pathPattern) {
+    final long ttl = 
env.getConfigManager().getTTLManager().getTTL(pathPattern);
+    return ttl == TTLCache.NULL_TTL ? TTL_NOT_EXIST : ttl;
+  }
+
+  private String[] getDatabaseWildcardPathPattern(final String[] pathPattern) {
+    final String[] pathNodes = Arrays.copyOf(pathPattern, pathPattern.length + 
1);
+    pathNodes[pathNodes.length - 1] = IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+    return pathNodes;
+  }
+
+  private void rollbackConfigNodeTTL(final ConfigNodeProcedureEnv env) throws 
ProcedureException {
+    restoreTTLOnConfigNode(env, plan.getPathPattern(), previousTTL);
+    if (plan.isDataBase()) {
+      restoreTTLOnConfigNode(
+          env, getDatabaseWildcardPathPattern(plan.getPathPattern()), 
previousDatabaseWildcardTTL);
+    }
+  }
+
+  private void restoreTTLOnConfigNode(
+      final ConfigNodeProcedureEnv env, final String[] pathPattern, final long 
ttl)
+      throws ProcedureException {
+    // TTL_NOT_EXIST means the original ttl was absent; NULL_TTL asks the 
executor to unset it.
+    final SetTTLPlan rollbackPlan =
+        new SetTTLPlan(pathPattern, ttl == TTL_NOT_EXIST ? TTLCache.NULL_TTL : 
ttl);
+    // Database rollback restores the database path and db.** separately, so 
avoid auto-expansion.
+    rollbackPlan.setDataBase(false);
+    final TSStatus status = writeConfigNodePlan(env, rollbackPlan);
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new ProcedureException(
+          new MetadataException(
+              "Rollback ConfigNode ttl failed for "
+                  + String.join(".", pathPattern)
+                  + ": "
+                  + status.getMessage()));
+    }
+  }
+
+  private void rollbackDataNodeTTL(final ConfigNodeProcedureEnv env) throws 
ProcedureException {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
+    restoreTTLOnDataNodes(dataNodeLocationMap, plan.getPathPattern(), 
previousTTL);
+    if (plan.isDataBase()) {
+      restoreTTLOnDataNodes(
+          dataNodeLocationMap,
+          getDatabaseWildcardPathPattern(plan.getPathPattern()),
+          previousDatabaseWildcardTTL);
+    }
+  }
+
+  private void restoreTTLOnDataNodes(
+      final Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+      final String[] pathPattern,
+      final long ttl)
+      throws ProcedureException {
+    if (dataNodeLocationMap.isEmpty()) {
+      return;
+    }
+    final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+        sendTTLRequest(
+            dataNodeLocationMap,
+            buildSetTTLReq(pathPattern, ttl == TTL_NOT_EXIST ? 
TTLCache.NULL_TTL : ttl, false));
+    if (hasFailedDataNode(clientHandler)) {
+      throw new ProcedureException(
+          new MetadataException(
+              "Rollback dataNode ttl cache failed for " + String.join(".", 
pathPattern)));
+    }
+  }
+
+  /**
+   * Best-effort rollback: restore both sides, throw the earliest failure, and 
suppress later ones.
+   */
   @Override
-  protected void rollbackState(
-      ConfigNodeProcedureEnv configNodeProcedureEnv, SetTTLState setTTLState)
-      throws IOException, InterruptedException, ProcedureException {}
+  protected void rollbackState(final ConfigNodeProcedureEnv env, final 
SetTTLState setTTLState)
+      throws IOException, InterruptedException, ProcedureException {
+    if (setTTLState != SetTTLState.UPDATE_DATANODE_CACHE || 
!previousTTLStateCaptured) {
+      return;
+    }
+    ProcedureException rollbackFailure = null;
+    try {
+      rollbackConfigNodeTTL(env);
+    } catch (ProcedureException e) {
+      LOGGER.error("Failed to rollback ConfigNode ttl state.", e);
+      rollbackFailure = e;
+    }
+    try {
+      rollbackDataNodeTTL(env);
+    } catch (ProcedureException e) {
+      LOGGER.error("Failed to rollback DataNode ttl cache.", e);
+      if (rollbackFailure == null) {
+        rollbackFailure = e;
+      } else {
+        rollbackFailure.addSuppressed(e);
+      }
+    }
+    if (rollbackFailure != null) {
+      throw rollbackFailure;
+    }
+  }
+
+  @Override
+  protected boolean isRollbackSupported(final SetTTLState state) {
+    return state == SetTTLState.UPDATE_DATANODE_CACHE;
+  }
 
   @Override
   protected SetTTLState getState(int stateId) {
@@ -148,7 +298,7 @@ public class SetTTLProcedure extends 
StateMachineProcedure<ConfigNodeProcedureEn
 
   @Override
   protected SetTTLState getInitialState() {
-    return SetTTLState.SET_CONFIGNODE_TTL;
+    return SetTTLState.CAPTURE_PREVIOUS_TTL;
   }
 
   @Override
@@ -159,14 +309,25 @@ public class SetTTLProcedure extends 
StateMachineProcedure<ConfigNodeProcedureEn
             : ProcedureType.SET_TTL_PROCEDURE.getTypeCode());
     super.serialize(stream);
     ReadWriteIOUtils.write(plan.serializeToByteBuffer(), stream);
+    stream.writeBoolean(previousTTLStateCaptured);
+    stream.writeLong(previousTTL);
+    stream.writeLong(previousDatabaseWildcardTTL);
   }
 
   @Override
   public void deserialize(ByteBuffer byteBuffer) {
     super.deserialize(byteBuffer);
     try {
-      ReadWriteIOUtils.readInt(byteBuffer);
+      final int length = ReadWriteIOUtils.readInt(byteBuffer);
+      final int position = byteBuffer.position();
       this.plan = (SetTTLPlan) ConfigPhysicalPlan.Factory.create(byteBuffer);
+      // The serialized plan buffer may include padding; skip to the actual 
payload end.
+      byteBuffer.position(position + length);
+      if (byteBuffer.remaining() >= ROLLBACK_STATE_BYTES) {
+        this.previousTTLStateCaptured = byteBuffer.get() != 0;
+        this.previousTTL = byteBuffer.getLong();
+        this.previousDatabaseWildcardTTL = byteBuffer.getLong();
+      }
     } catch (IOException e) {
       LOGGER.error(ProcedureMessages.IO_ERROR_WHEN_DESERIALIZE_SETTTL_PLAN, e);
     }
@@ -180,12 +341,21 @@ public class SetTTLProcedure extends 
StateMachineProcedure<ConfigNodeProcedureEn
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    return this.plan.equals(((SetTTLProcedure) o).plan)
-        && this.isGeneratedByPipe == (((SetTTLProcedure) o).isGeneratedByPipe);
+    final SetTTLProcedure that = (SetTTLProcedure) o;
+    return this.isGeneratedByPipe == that.isGeneratedByPipe
+        && this.previousTTL == that.previousTTL
+        && this.previousDatabaseWildcardTTL == that.previousDatabaseWildcardTTL
+        && this.previousTTLStateCaptured == that.previousTTLStateCaptured
+        && this.plan.equals(that.plan);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(plan, isGeneratedByPipe);
+    return Objects.hash(
+        plan,
+        isGeneratedByPipe,
+        previousTTL,
+        previousDatabaseWildcardTTL,
+        previousTTLStateCaptured);
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/SetTTLState.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/SetTTLState.java
index fbdc026fc70..4dd3063ea3f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/SetTTLState.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/SetTTLState.java
@@ -19,6 +19,8 @@
 package org.apache.iotdb.confignode.procedure.state.schema;
 
 public enum SetTTLState {
+  // Keep existing state ordinals stable for persisted procedures.
   SET_CONFIGNODE_TTL,
-  UPDATE_DATANODE_CACHE
+  UPDATE_DATANODE_CACHE,
+  CAPTURE_PREVIOUS_TTL
 }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java
index 42a23d35cb9..e424671fb3a 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.ttl.TTLCache;
 import org.apache.iotdb.confignode.consensus.request.read.ttl.ShowTTLPlan;
 import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
 import org.apache.iotdb.confignode.consensus.response.ttl.ShowTTLResp;
@@ -50,6 +51,7 @@ public class TTLInfoTest {
   private final File snapshotDir = new File(BASE_OUTPUT_PATH, 
"ttlInfo-snapshot");
   private final long ttl = 123435565323L;
   private long[] originTTLArr;
+  private int originTTlRuleCapacity;
 
   @Before
   public void setup() throws IOException {
@@ -57,6 +59,7 @@ public class TTLInfoTest {
       snapshotDir.mkdirs();
     }
     originTTLArr = CommonDescriptor.getInstance().getConfig().getTierTTLInMs();
+    originTTlRuleCapacity = 
CommonDescriptor.getInstance().getConfig().getTTlRuleCapacity();
     long[] ttlArr = new long[2];
     ttlArr[0] = 10000000L;
     ttlArr[1] = ttl;
@@ -70,6 +73,7 @@ public class TTLInfoTest {
       FileUtils.deleteDirectory(snapshotDir);
     }
     CommonDescriptor.getInstance().getConfig().setTierTTLInMs(originTTLArr);
+    
CommonDescriptor.getInstance().getConfig().setTTlRuleCapacity(originTTlRuleCapacity);
   }
 
   @Test
@@ -208,6 +212,17 @@ public class TTLInfoTest {
     Assert.assertEquals(4, ttlInfo.getTTLCount());
   }
 
+  @Test
+  public void testGetTTLReturnsExactPathTTL() throws IllegalPathException {
+    PartialPath path = new PartialPath("root.test.db1.**");
+    ttlInfo.setTTL(new SetTTLPlan(Arrays.asList(path.getNodes()), 121322323L));
+
+    Assert.assertEquals(121322323L, ttlInfo.getTTL(path.getNodes()));
+    Assert.assertEquals(
+        TTLCache.NULL_TTL, ttlInfo.getTTL(new 
PartialPath("root.test.db1").getNodes()));
+    Assert.assertEquals(Long.MAX_VALUE, ttlInfo.getTTL(new 
PartialPath("root.**").getNodes()));
+  }
+
   @Test
   public void testUnsetNonExistTTL() throws IllegalPathException {
     assertEquals(
@@ -241,10 +256,61 @@ public class TTLInfoTest {
     final TSStatus status = ttlInfo.setTTL(setTTLPlan);
     assertEquals(TSStatusCode.OVERSIZE_TTL.getStatusCode(), status.code);
     assertEquals(
-        "The number of TTL rules has reached the limit (1000). Please delete 
some existing rules first.",
+        "The number of TTL rules has reached the limit "
+            + "(capacity: 1000, requested total: 1001). Please delete some 
existing rules first.",
         status.message);
   }
 
+  @Test
+  public void testUpdateExistingTTLWhenCapacityIsReached() {
+    CommonDescriptor.getInstance().getConfig().setTTlRuleCapacity(2);
+
+    SetTTLPlan setTTLPlan =
+        new SetTTLPlan(PathNodesGenerator.splitPathToNodes("root.sg1.d1.**"), 
1000);
+    assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
ttlInfo.setTTL(setTTLPlan).code);
+    assertEquals(2, ttlInfo.getTTLCount());
+
+    setTTLPlan = new 
SetTTLPlan(PathNodesGenerator.splitPathToNodes("root.sg1.d1.**"), 2000);
+    assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
ttlInfo.setTTL(setTTLPlan).code);
+    assertEquals(2, ttlInfo.getTTLCount());
+    assertEquals(
+        Long.valueOf(2000),
+        ttlInfo.showTTL(new 
ShowTTLPlan()).getPathTTLMap().get("root.sg1.d1.**"));
+  }
+
+  @Test
+  public void testUpdateExistingTTLWhenCurrentStateIsAlreadyOversize() {
+    SetTTLPlan setTTLPlan =
+        new SetTTLPlan(PathNodesGenerator.splitPathToNodes("root.sg1.d1.**"), 
1000);
+    assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
ttlInfo.setTTL(setTTLPlan).code);
+    assertEquals(2, ttlInfo.getTTLCount());
+
+    CommonDescriptor.getInstance().getConfig().setTTlRuleCapacity(1);
+
+    setTTLPlan = new 
SetTTLPlan(PathNodesGenerator.splitPathToNodes("root.sg1.d1.**"), 2000);
+    assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
ttlInfo.setTTL(setTTLPlan).code);
+    assertEquals(2, ttlInfo.getTTLCount());
+    assertEquals(
+        Long.valueOf(2000),
+        ttlInfo.showTTL(new 
ShowTTLPlan()).getPathTTLMap().get("root.sg1.d1.**"));
+  }
+
+  @Test
+  public void testDatabaseTTLShouldReserveTwoSlots() {
+    CommonDescriptor.getInstance().getConfig().setTTlRuleCapacity(2);
+
+    SetTTLPlan setTTLPlan = new 
SetTTLPlan(PathNodesGenerator.splitPathToNodes("root.sg1"), 1000);
+    setTTLPlan.setDataBase(true);
+
+    final TSStatus status = ttlInfo.setTTL(setTTLPlan);
+    assertEquals(TSStatusCode.OVERSIZE_TTL.getStatusCode(), status.code);
+    assertEquals(1, ttlInfo.getTTLCount());
+    assertEquals(1, ttlInfo.showTTL(new ShowTTLPlan()).getPathTTLMap().size());
+    assertEquals(
+        Long.valueOf(Long.MAX_VALUE),
+        ttlInfo.showTTL(new ShowTTLPlan()).getPathTTLMap().get("root.**"));
+  }
+
   @Test
   public void testSnapshot() throws TException, IOException, 
IllegalPathException {
     // set ttl
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java
index 5042eb1dd0f..cb09c23659c 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java
@@ -19,19 +19,42 @@
 
 package org.apache.iotdb.confignode.procedure.impl.schema;
 
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.ttl.TTLCache;
+import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
+import 
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
 import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.TTLManager;
+import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.confignode.procedure.Procedure;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.state.ProcedureState;
+import org.apache.iotdb.confignode.procedure.state.schema.SetTTLState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 public class SetTTLProcedureTest {
 
@@ -65,4 +88,308 @@ public class SetTTLProcedureTest {
     buffer.clear();
     byteArrayOutputStream.reset();
   }
+
+  @Test
+  public void serializeDeserializeTestWithCapturedRollbackState() throws 
Exception {
+    final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
+
+    final SetTTLPlan setTTLPlan =
+        new SetTTLPlan(Arrays.asList(new PartialPath("root.db").getNodes()), 
2000L);
+    setTTLPlan.setDataBase(true);
+    final TestingSetTTLProcedure procedure = new 
TestingSetTTLProcedure(setTTLPlan);
+
+    final Map<String, Long> ttlMap = new HashMap<>();
+    ttlMap.put("root.**", Long.MAX_VALUE);
+    ttlMap.put("root.db", 500L);
+    ttlMap.put("root.db.**", 600L);
+
+    procedure.executeFromState(mockProcedureEnv(ttlMap), 
SetTTLState.CAPTURE_PREVIOUS_TTL);
+
+    procedure.serialize(outputStream);
+    final ByteBuffer buffer =
+        ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    final SetTTLProcedure deserializedProcedure =
+        (SetTTLProcedure) ProcedureFactory.getInstance().create(buffer);
+    assertSerializedProcedure(
+        deserializedProcedure, "root.db", 2000L, true, true, 500L, 600L, 
false);
+  }
+
+  @Test
+  public void deserializeOldFormatWithoutRollbackStateTest() throws Exception {
+    final SetTTLPlan setTTLPlan =
+        new SetTTLPlan(Arrays.asList(new PartialPath("root.db").getNodes()), 
2000L);
+    setTTLPlan.setDataBase(true);
+
+    final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
+    writeOldFormatProcedure(outputStream, setTTLPlan);
+
+    final ByteBuffer buffer =
+        ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    final SetTTLProcedure deserializedProcedure =
+        (SetTTLProcedure) ProcedureFactory.getInstance().create(buffer);
+
+    assertSerializedProcedure(
+        deserializedProcedure,
+        "root.db",
+        2000L,
+        true,
+        false,
+        Long.MIN_VALUE,
+        Long.MIN_VALUE,
+        false);
+  }
+
+  @Test
+  public void setConfigNodeTTLShouldNotWriteBeforePreviousStateIsCaptured() 
throws Exception {
+    final SetTTLPlan setTTLPlan =
+        new SetTTLPlan(Arrays.asList(new PartialPath("root.db").getNodes()), 
2000L);
+    setTTLPlan.setDataBase(true);
+    final TestingSetTTLProcedure procedure = new 
TestingSetTTLProcedure(setTTLPlan);
+
+    final Map<String, Long> ttlMap = new HashMap<>();
+    ttlMap.put("root.**", Long.MAX_VALUE);
+    ttlMap.put("root.db", 500L);
+    ttlMap.put("root.db.**", 600L);
+
+    procedure.executeFromState(mockProcedureEnv(ttlMap), 
SetTTLState.SET_CONFIGNODE_TTL);
+
+    Assert.assertTrue(procedure.getWrittenPlans().isEmpty());
+    assertSerializedProcedure(procedure, "root.db", 2000L, true, true, 500L, 
600L, false);
+
+    procedure.executeFromState(mockProcedureEnv(ttlMap), 
SetTTLState.SET_CONFIGNODE_TTL);
+
+    Assert.assertEquals(1, procedure.getWrittenPlans().size());
+    assertPlan(procedure.getWrittenPlans().get(0), "root.db", 2000L, true);
+  }
+
+  @Test
+  public void rollbackStateShouldUnsetNewTTLWhenPreviousStateDidNotExist() 
throws Exception {
+    final SetTTLPlan setTTLPlan =
+        new SetTTLPlan(Arrays.asList(new 
PartialPath("root.test.sg1.**").getNodes()), 1000L);
+    final TestingSetTTLProcedure procedure = new 
TestingSetTTLProcedure(setTTLPlan);
+    procedure.failFirstDataNodeUpdateForTest();
+
+    final ConfigNodeProcedureEnv env =
+        mockProcedureEnv(Collections.singletonMap("root.**", Long.MAX_VALUE));
+
+    procedure.executeFromState(env, SetTTLState.CAPTURE_PREVIOUS_TTL);
+    procedure.executeFromState(env, SetTTLState.SET_CONFIGNODE_TTL);
+    procedure.executeFromState(env, SetTTLState.UPDATE_DATANODE_CACHE);
+    Assert.assertTrue(procedure.isFailed());
+
+    procedure.rollbackState(env, SetTTLState.UPDATE_DATANODE_CACHE);
+
+    Assert.assertEquals(2, procedure.getWrittenPlans().size());
+    assertPlan(procedure.getWrittenPlans().get(0), "root.test.sg1.**", 1000L, 
false);
+    assertPlan(procedure.getWrittenPlans().get(1), "root.test.sg1.**", -1L, 
false);
+
+    Assert.assertEquals(2, procedure.getRequests().size());
+    assertRequest(procedure.getRequests().get(0), "root.test.sg1.**", 1000L, 
false);
+    assertRequest(procedure.getRequests().get(1), "root.test.sg1.**", -1L, 
false);
+  }
+
+  @Test
+  public void rollbackStateShouldRestoreDatabaseWildcardTTLSeparately() throws 
Exception {
+    final SetTTLPlan setTTLPlan =
+        new SetTTLPlan(Arrays.asList(new PartialPath("root.db").getNodes()), 
2000L);
+    setTTLPlan.setDataBase(true);
+    final TestingSetTTLProcedure procedure = new 
TestingSetTTLProcedure(setTTLPlan);
+    procedure.failFirstDataNodeUpdateForTest();
+
+    final Map<String, Long> ttlMap = new HashMap<>();
+    ttlMap.put("root.**", Long.MAX_VALUE);
+    ttlMap.put("root.db", 500L);
+    ttlMap.put("root.db.**", 600L);
+    final ConfigNodeProcedureEnv env = mockProcedureEnv(ttlMap);
+
+    procedure.executeFromState(env, SetTTLState.CAPTURE_PREVIOUS_TTL);
+    procedure.executeFromState(env, SetTTLState.SET_CONFIGNODE_TTL);
+    procedure.executeFromState(env, SetTTLState.UPDATE_DATANODE_CACHE);
+    Assert.assertTrue(procedure.isFailed());
+
+    procedure.rollbackState(env, SetTTLState.UPDATE_DATANODE_CACHE);
+
+    Assert.assertEquals(3, procedure.getWrittenPlans().size());
+    assertPlan(procedure.getWrittenPlans().get(0), "root.db", 2000L, true);
+    assertPlan(procedure.getWrittenPlans().get(1), "root.db", 500L, false);
+    assertPlan(procedure.getWrittenPlans().get(2), "root.db.**", 600L, false);
+
+    Assert.assertEquals(3, procedure.getRequests().size());
+    assertRequest(procedure.getRequests().get(0), "root.db", 2000L, true);
+    assertRequest(procedure.getRequests().get(1), "root.db", 500L, false);
+    assertRequest(procedure.getRequests().get(2), "root.db.**", 600L, false);
+  }
+
+  private ConfigNodeProcedureEnv mockProcedureEnv(final Map<String, Long> 
ttlMap) {
+    final ConfigNodeProcedureEnv env = 
Mockito.mock(ConfigNodeProcedureEnv.class);
+    final ConfigManager configManager = Mockito.mock(ConfigManager.class);
+    final TTLManager ttlManager = Mockito.mock(TTLManager.class);
+    final NodeManager nodeManager = Mockito.mock(NodeManager.class);
+
+    final TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
+    dataNodeLocation.setDataNodeId(1);
+
+    Mockito.when(env.getConfigManager()).thenReturn(configManager);
+    Mockito.when(configManager.getTTLManager()).thenReturn(ttlManager);
+    Mockito.when(ttlManager.getTTL(Mockito.any(String[].class)))
+        .thenAnswer(
+            invocation -> {
+              final String[] pathPattern = invocation.getArgument(0);
+              return ttlMap.getOrDefault(String.join(".", pathPattern), 
TTLCache.NULL_TTL);
+            });
+    Mockito.when(configManager.getNodeManager()).thenReturn(nodeManager);
+    Mockito.when(nodeManager.getRegisteredDataNodeLocations())
+        .thenReturn(Collections.singletonMap(1, dataNodeLocation));
+    return env;
+  }
+
+  private void assertPlan(
+      final SetTTLPlan plan, final String path, final long ttl, final boolean 
isDataBase) {
+    Assert.assertEquals(path, String.join(".", plan.getPathPattern()));
+    Assert.assertEquals(ttl, plan.getTTL());
+    Assert.assertEquals(isDataBase, plan.isDataBase());
+  }
+
+  private void assertRequest(
+      final TSetTTLReq req, final String path, final long ttl, final boolean 
isDataBase) {
+    Assert.assertEquals(Collections.singletonList(path), req.getPathPattern());
+    Assert.assertEquals(ttl, req.getTTL());
+    Assert.assertEquals(isDataBase, req.isDataBase);
+  }
+
+  private void writeOldFormatProcedure(final DataOutputStream stream, final 
SetTTLPlan plan)
+      throws IOException {
+    stream.writeShort(ProcedureType.SET_TTL_PROCEDURE.getTypeCode());
+    // Procedure fields.
+    stream.writeLong(Procedure.NO_PROC_ID);
+    stream.writeInt(ProcedureState.INITIALIZING.ordinal());
+    stream.writeLong(0L);
+    stream.writeLong(0L);
+    stream.writeLong(Procedure.NO_PROC_ID);
+    stream.writeLong(Procedure.NO_TIMEOUT);
+    stream.writeInt(-1); // no stack indexes
+    stream.write((byte) 0); // no exception
+    stream.writeInt(-1); // no result
+    stream.write((byte) 0); // no lock
+    // StateMachineProcedure fields.
+    stream.writeInt(0); // no states
+    ReadWriteIOUtils.write(plan.serializeToByteBuffer(), stream);
+  }
+
+  private void assertSerializedProcedure(
+      final SetTTLProcedure procedure,
+      final String path,
+      final long ttl,
+      final boolean isDataBase,
+      final boolean previousTTLStateCaptured,
+      final long previousTTL,
+      final long previousDatabaseWildcardTTL,
+      final boolean isGeneratedByPipe)
+      throws Exception {
+    final Field planField = findField(SetTTLProcedure.class, "plan");
+    planField.setAccessible(true);
+    assertPlan((SetTTLPlan) planField.get(procedure), path, ttl, isDataBase);
+
+    final Field previousTTLStateCapturedField =
+        findField(SetTTLProcedure.class, "previousTTLStateCaptured");
+    previousTTLStateCapturedField.setAccessible(true);
+    Assert.assertEquals(previousTTLStateCaptured, 
previousTTLStateCapturedField.get(procedure));
+
+    final Field previousTTLField = findField(SetTTLProcedure.class, 
"previousTTL");
+    previousTTLField.setAccessible(true);
+    Assert.assertEquals(previousTTL, previousTTLField.get(procedure));
+
+    final Field previousDatabaseWildcardTTLField =
+        findField(SetTTLProcedure.class, "previousDatabaseWildcardTTL");
+    previousDatabaseWildcardTTLField.setAccessible(true);
+    Assert.assertEquals(
+        previousDatabaseWildcardTTL, 
previousDatabaseWildcardTTLField.get(procedure));
+
+    final Field isGeneratedByPipeField = findField(SetTTLProcedure.class, 
"isGeneratedByPipe");
+    isGeneratedByPipeField.setAccessible(true);
+    Assert.assertEquals(isGeneratedByPipe, 
isGeneratedByPipeField.get(procedure));
+  }
+
+  private Field findField(final Class<?> clazz, final String fieldName)
+      throws NoSuchFieldException {
+    Class<?> current = clazz;
+    while (current != null) {
+      try {
+        return current.getDeclaredField(fieldName);
+      } catch (NoSuchFieldException e) {
+        current = current.getSuperclass();
+      }
+    }
+    throw new NoSuchFieldException(fieldName);
+  }
+
+  private static class TestingSetTTLProcedure extends SetTTLProcedure {
+
+    private final List<TSetTTLReq> requests = new ArrayList<>();
+    private final List<SetTTLPlan> writtenPlans = new ArrayList<>();
+    private boolean failFirstDataNodeUpdate = false;
+    private int requestCount = 0;
+
+    private TestingSetTTLProcedure(final SetTTLPlan plan) {
+      super(plan, false);
+    }
+
+    private void failFirstDataNodeUpdateForTest() {
+      failFirstDataNodeUpdate = true;
+    }
+
+    private List<TSetTTLReq> getRequests() {
+      return requests;
+    }
+
+    private List<SetTTLPlan> getWrittenPlans() {
+      return writtenPlans;
+    }
+
+    @Override
+    TSStatus writeConfigNodePlan(final ConfigNodeProcedureEnv env, final 
SetTTLPlan setTTLPlan) {
+      writtenPlans.add(copyPlan(setTTLPlan));
+      return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    }
+
+    @Override
+    DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> sendTTLRequest(
+        final Map<Integer, TDataNodeLocation> dataNodeLocationMap, final 
TSetTTLReq req) {
+      requests.add(copyRequest(req));
+
+      final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+          new DataNodeAsyncRequestContext<>(
+              CnToDnAsyncRequestType.SET_TTL, copyRequest(req), 
dataNodeLocationMap);
+      final List<Integer> requestIds = new 
ArrayList<>(clientHandler.getNodeLocationMap().keySet());
+      final boolean shouldFail = failFirstDataNodeUpdate && requestCount++ == 
0;
+
+      for (Integer requestId : requestIds) {
+        clientHandler
+            .getResponseMap()
+            .put(
+                requestId,
+                new TSStatus(
+                    shouldFail
+                        ? TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()
+                        : TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+        if (!shouldFail) {
+          clientHandler.getNodeLocationMap().remove(requestId);
+        }
+      }
+      return clientHandler;
+    }
+
+    private SetTTLPlan copyPlan(final SetTTLPlan plan) {
+      final SetTTLPlan copiedPlan =
+          new SetTTLPlan(Arrays.asList(plan.getPathPattern()), plan.getTTL());
+      copiedPlan.setDataBase(plan.isDataBase());
+      return copiedPlan;
+    }
+
+    private TSetTTLReq copyRequest(final TSetTTLReq req) {
+      return new TSetTTLReq(new ArrayList<>(req.getPathPattern()), 
req.getTTL(), req.isDataBase);
+    }
+  }
 }


Reply via email to