This is an automated email from the ASF dual-hosted git repository.

shoorz pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 79d209b3b3f Revert "Merge branch 'dev/1.3' of 
https://github.com/apache/iotdb into dev/1.3"
79d209b3b3f is described below

commit 79d209b3b3f5be785fa98826b6c76389d428cd1c
Author: MiniSho <[email protected]>
AuthorDate: Tue Mar 25 19:16:47 2025 +0800

    Revert "Merge branch 'dev/1.3' of https://github.com/apache/iotdb into 
dev/1.3"
    
    This reverts commit e1bd3fd8a43816b0c6748d38f0fba733f3e06b6c, reversing
    changes made to f019f2627c6b270e4961b4164badf000674d4d20.
---
 .../client/async/CnToDnAsyncRequestType.java       |   1 -
 .../CnToDnInternalServiceAsyncRequestManager.java  |   6 -
 .../rpc/DataNodeAsyncRequestRPCHandler.java        |   1 -
 .../receiver/protocol/IoTDBConfigNodeReceiver.java |  28 +----
 .../procedure/PartitionTableAutoCleaner.java       |   3 -
 .../procedure/TimeoutExecutorThread.java           |   2 +-
 .../procedure/env/ConfigNodeProcedureEnv.java      |  15 ---
 .../region/NotifyRegionMigrationProcedure.java     | 137 ---------------------
 .../impl/region/RegionMigrateProcedure.java        |   2 -
 .../impl/schema/UnsetTemplateProcedure.java        |  74 +++--------
 .../state/NotifyRegionMigrationState.java          |  24 ----
 .../procedure/store/ProcedureFactory.java          |   7 --
 .../confignode/procedure/store/ProcedureType.java  |   1 -
 iotdb-core/datanode/pom.xml                        |   2 +-
 .../protocol/opcda/OpcDaServerHandle.java          |   5 +-
 .../async/IoTDBDataRegionAsyncConnector.java       |  29 ++---
 .../impl/DataNodeInternalRPCServiceImpl.java       |   8 +-
 .../plan/scheduler/load/LoadTsFileScheduler.java   |  10 --
 .../template/TemplateInternalRPCUpdateType.java    |   1 +
 .../iotdb/db/service/RegionMigrateService.java     |  18 ---
 .../agent/SubscriptionBrokerAgent.java             |  10 +-
 .../db/subscription/broker/SubscriptionBroker.java |  25 ++--
 .../broker/SubscriptionPrefetchingQueue.java       |  11 +-
 .../SubscriptionDataNodeResourceManager.java       |  43 -------
 .../resource/log/SubscriptionLogManager.java       |  39 ------
 .../resource/log/SubscriptionLogStatus.java        |  82 ------------
 .../db/utils/datastructure/AlignedTVList.java      |   7 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |  21 +---
 .../iotdb/commons/conf/CommonDescriptor.java       |  10 --
 .../subscription/config/SubscriptionConfig.java    |  12 --
 .../src/main/thrift/datanode.thrift                |  10 --
 pom.xml                                            |  13 +-
 32 files changed, 66 insertions(+), 591 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
index 225d4341913..e5d1515d27e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
@@ -39,7 +39,6 @@ public enum CnToDnAsyncRequestType {
   CREATE_SCHEMA_REGION,
   DELETE_REGION,
   RESET_PEER_LIST,
-  NOTIFY_REGION_MIGRATION,
   UPDATE_REGION_ROUTE_MAP,
   CHANGE_REGION_LEADER,
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
index 59f99fe57cb..1bae93c7399 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
@@ -71,7 +71,6 @@ import 
org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
-import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPushMultiPipeMetaReq;
@@ -246,11 +245,6 @@ public class CnToDnInternalServiceAsyncRequestManager
         CnToDnAsyncRequestType.UPDATE_REGION_ROUTE_MAP,
         (req, client, handler) ->
             client.updateRegionCache((TRegionRouteReq) req, 
(DataNodeTSStatusRPCHandler) handler));
-    actionMapBuilder.put(
-        CnToDnAsyncRequestType.NOTIFY_REGION_MIGRATION,
-        (req, client, handler) ->
-            client.notifyRegionMigration(
-                (TNotifyRegionMigrationReq) req, (DataNodeTSStatusRPCHandler) 
handler));
     actionMapBuilder.put(
         CnToDnAsyncRequestType.CHANGE_REGION_LEADER,
         (req, client, handler) ->
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
index 6e2a9dc97cc..dc8f14cfaa9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
@@ -202,7 +202,6 @@ public abstract class 
DataNodeAsyncRequestRPCHandler<Response>
       case STOP_REPAIR_DATA:
       case LOAD_CONFIGURATION:
       case SET_SYSTEM_STATUS:
-      case NOTIFY_REGION_MIGRATION:
       case UPDATE_REGION_ROUTE_MAP:
       case INVALIDATE_SCHEMA_CACHE:
       case INVALIDATE_MATCHED_SCHEMA_CACHE:
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 0986e50b31a..87ba3affef1 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -22,7 +22,6 @@ package 
org.apache.iotdb.confignode.manager.pipe.receiver.protocol;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.auth.entity.PrivilegeType;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import 
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
@@ -284,27 +283,12 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                 PrivilegeType.WRITE_SCHEMA.ordinal())
             .getStatus();
       case SetTTL:
-        return Objects.equals(
-                configManager
-                    .getTTLManager()
-                    .getAllTTL()
-                    .get(
-                        String.join(
-                            String.valueOf(IoTDBConstant.PATH_SEPARATOR),
-                            ((SetTTLPlan) plan).getPathPattern())),
-                ((SetTTLPlan) plan).getTTL())
-            ? StatusUtils.OK
-            : configManager
-                .checkUserPrivileges(
-                    username,
-                    ((SetTTLPlan) plan).isDataBase()
-                        ? Collections.emptyList()
-                        : Collections.singletonList(
-                            new PartialPath(((SetTTLPlan) 
plan).getPathPattern())),
-                    ((SetTTLPlan) plan).isDataBase()
-                        ? PrivilegeType.MANAGE_DATABASE.ordinal()
-                        : PrivilegeType.WRITE_SCHEMA.ordinal())
-                .getStatus();
+        return configManager
+            .checkUserPrivileges(
+                username,
+                Collections.singletonList(new PartialPath(((SetTTLPlan) 
plan).getPathPattern())),
+                PrivilegeType.WRITE_SCHEMA.ordinal())
+            .getStatus();
       case UpdateTriggerStateInTable:
       case DeleteTriggerInTable:
         return configManager
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/PartitionTableAutoCleaner.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/PartitionTableAutoCleaner.java
index ce5d07276db..a4918e6bfb6 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/PartitionTableAutoCleaner.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/PartitionTableAutoCleaner.java
@@ -67,9 +67,6 @@ public class PartitionTableAutoCleaner<Env> extends 
InternalProcedure<Env> {
       }
     }
     if (!databaseTTLMap.isEmpty()) {
-      LOGGER.info(
-          "[PartitionTableCleaner] Periodically activate 
PartitionTableAutoCleaner for: {}",
-          databaseTTLMap);
       // Only clean the partition table when necessary
       TTimePartitionSlot currentTimePartitionSlot =
           TimePartitionUtils.getCurrentTimePartitionSlot();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
index 5aaf9a623f5..d4f919c01be 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
@@ -100,7 +100,7 @@ public class TimeoutExecutorThread<Env> extends 
StoppableThread {
 
     @Override
     public int compareTo(Delayed other) {
-      return Long.compare(
+      return Long.compareUnsigned(
           this.getDelay(TimeUnit.MILLISECONDS), 
other.getDelay(TimeUnit.MILLISECONDS));
     }
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index a21d2349ed9..b6f4fe35d9e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -70,7 +70,6 @@ import 
org.apache.iotdb.mpp.rpc.thrift.TDropPipePluginInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
-import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushMultiPipeMetaReq;
@@ -523,20 +522,6 @@ public class ConfigNodeProcedureEnv {
     return req;
   }
 
-  public List<TSStatus> notifyRegionMigrationToAllDataNodes(
-      TConsensusGroupId consensusGroupId, boolean isStart) {
-    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
-        configManager.getNodeManager().getRegisteredDataNodeLocations();
-    final TNotifyRegionMigrationReq request =
-        new TNotifyRegionMigrationReq(consensusGroupId, isStart);
-
-    final DataNodeAsyncRequestContext<TNotifyRegionMigrationReq, TSStatus> 
clientHandler =
-        new DataNodeAsyncRequestContext<>(
-            CnToDnAsyncRequestType.NOTIFY_REGION_MIGRATION, request, 
dataNodeLocationMap);
-    
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
-    return clientHandler.getResponseList();
-  }
-
   public void persistRegionGroup(CreateRegionGroupsPlan 
createRegionGroupsPlan) {
     // Persist the allocation result
     try {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/NotifyRegionMigrationProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/NotifyRegionMigrationProcedure.java
deleted file mode 100644
index 0160e9116f9..00000000000
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/NotifyRegionMigrationProcedure.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.procedure.impl.region;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
-import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
-import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
-import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
-import 
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
-import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
-import org.apache.iotdb.confignode.procedure.state.NotifyRegionMigrationState;
-import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Objects;
-
-/** A procedure that notifies all DNs of the ongoing region migration 
procedure. */
-public class NotifyRegionMigrationProcedure
-    extends RegionOperationProcedure<NotifyRegionMigrationState> {
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(NotifyRegionMigrationProcedure.class);
-
-  private boolean isStart;
-
-  public NotifyRegionMigrationProcedure() {
-    super();
-  }
-
-  public NotifyRegionMigrationProcedure(TConsensusGroupId consensusGroupId, 
boolean isStart) {
-    super(consensusGroupId);
-    this.isStart = isStart;
-  }
-
-  @Override
-  protected Flow executeFromState(ConfigNodeProcedureEnv env, 
NotifyRegionMigrationState state)
-      throws ProcedureSuspendedException, ProcedureYieldException, 
InterruptedException {
-    if (regionId == null) {
-      return Flow.NO_MORE_STATE;
-    }
-    try {
-      LOGGER.info(
-          "[pid{}][NotifyRegionMigration] started, region id is {}.", 
getProcId(), regionId);
-      env.notifyRegionMigrationToAllDataNodes(regionId, isStart);
-    } catch (Exception e) {
-      LOGGER.error("[pid{}][NotifyRegionMigration] state {} failed", 
getProcId(), state, e);
-      return Flow.NO_MORE_STATE;
-    }
-    LOGGER.info("[pid{}][NotifyRegionMigration] state {} complete", 
getProcId(), state);
-    return Flow.HAS_MORE_STATE;
-  }
-
-  @Override
-  protected void rollbackState(
-      ConfigNodeProcedureEnv configNodeProcedureEnv, 
NotifyRegionMigrationState state)
-      throws IOException, InterruptedException, ProcedureException {}
-
-  @Override
-  protected NotifyRegionMigrationState getState(int stateId) {
-    return NotifyRegionMigrationState.values()[stateId];
-  }
-
-  @Override
-  protected int getStateId(NotifyRegionMigrationState state) {
-    return state.ordinal();
-  }
-
-  @Override
-  protected NotifyRegionMigrationState getInitialState() {
-    return NotifyRegionMigrationState.INIT;
-  }
-
-  @Override
-  public void serialize(DataOutputStream stream) throws IOException {
-    
stream.writeShort(ProcedureType.NOTIFY_REGION_MIGRATION_PROCEDURE.getTypeCode());
-    super.serialize(stream);
-    ThriftCommonsSerDeUtils.serializeTConsensusGroupId(regionId, stream);
-    stream.writeBoolean(isStart);
-  }
-
-  @Override
-  public void deserialize(ByteBuffer byteBuffer) {
-    super.deserialize(byteBuffer);
-    try {
-      regionId = 
ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer);
-      isStart = (byteBuffer.get() != (byte) 0);
-    } catch (ThriftSerDeException e) {
-      LOGGER.error("Error in deserialize {}", this.getClass(), e);
-    }
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (!(obj instanceof NotifyRegionMigrationProcedure)) {
-      return false;
-    }
-    NotifyRegionMigrationProcedure procedure = 
(NotifyRegionMigrationProcedure) obj;
-    return this.regionId.equals(procedure.regionId) && this.isStart == 
procedure.isStart;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(regionId, isStart);
-  }
-
-  @Override
-  public String toString() {
-    return "NotifyRegionMigrationProcedure{"
-        + "regionId="
-        + regionId
-        + ", isStart="
-        + isStart
-        + '}';
-  }
-}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
index 1acdd2ccd54..f4d5dce26b6 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
@@ -83,7 +83,6 @@ public class RegionMigrateProcedure extends 
RegionOperationProcedure<RegionTrans
               regionId,
               handler.simplifiedLocation(originalDataNode),
               handler.simplifiedLocation(destDataNode));
-          addChildProcedure(new NotifyRegionMigrationProcedure(regionId, 
true));
           setNextState(RegionTransitionState.ADD_REGION_PEER);
           break;
         case ADD_REGION_PEER:
@@ -125,7 +124,6 @@ public class RegionMigrateProcedure extends 
RegionOperationProcedure<RegionTrans
               CommonDateTimeUtils.convertMillisecondToDurationStr(
                   System.currentTimeMillis() - getSubmittedTime()),
               DateTimeUtils.convertLongToDate(getSubmittedTime(), "ms"));
-          addChildProcedure(new NotifyRegionMigrationProcedure(regionId, 
false));
           return Flow.NO_MORE_STATE;
         default:
           throw new ProcedureException("Unsupported state: " + state.name());
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/UnsetTemplateProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/UnsetTemplateProcedure.java
index 5233bb7206e..81405960885 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/UnsetTemplateProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/UnsetTemplateProcedure.java
@@ -144,6 +144,9 @@ public class UnsetTemplateProcedure
 
   private void invalidateCache(final ConfigNodeProcedureEnv env) {
     try {
+      // Cannot roll back after cache invalidation
+      // Because we do not know whether there are time series successfully 
created
+      alreadyRollback = true;
       executeInvalidateCache(env);
       setNextState(UnsetTemplateState.CHECK_DATANODE_TEMPLATE_ACTIVATION);
     } catch (final ProcedureException e) {
@@ -214,26 +217,21 @@ public class UnsetTemplateProcedure
     }
     alreadyRollback = true;
     ProcedureException rollbackException;
-    try {
-      executeRollbackInvalidateCache(env);
-      final TSStatus status =
-          env.getConfigManager()
-              .getClusterSchemaManager()
-              .rollbackPreUnsetSchemaTemplate(template.getId(), path);
-      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        return;
-      } else {
-        LOGGER.error(
-            "Failed to rollback pre unset template operation of template {} 
set on {}",
-            template.getName(),
-            path);
-        rollbackException =
-            new ProcedureException(
-                new MetadataException(
-                    "Rollback template pre unset failed because of" + 
status.getMessage()));
-      }
-    } catch (final ProcedureException e) {
-      rollbackException = e;
+    final TSStatus status =
+        env.getConfigManager()
+            .getClusterSchemaManager()
+            .rollbackPreUnsetSchemaTemplate(template.getId(), path);
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return;
+    } else {
+      LOGGER.error(
+          "Failed to rollback pre unset template operation of template {} set 
on {}",
+          template.getName(),
+          path);
+      rollbackException =
+          new ProcedureException(
+              new MetadataException(
+                  "Rollback template pre unset failed because of" + 
status.getMessage()));
     }
     try {
       executeInvalidateCache(env);
@@ -246,42 +244,6 @@ public class UnsetTemplateProcedure
     }
   }
 
-  private void executeRollbackInvalidateCache(ConfigNodeProcedureEnv env)
-      throws ProcedureException {
-    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
-        
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
-    TUpdateTemplateReq rollbackTemplateSetInfoReq = new TUpdateTemplateReq();
-    rollbackTemplateSetInfoReq.setType(
-        
TemplateInternalRPCUpdateType.ROLLBACK_INVALIDATE_TEMPLATE_SET_INFO.toByte());
-    rollbackTemplateSetInfoReq.setTemplateInfo(getAddTemplateSetInfo());
-    DataNodeAsyncRequestContext<TUpdateTemplateReq, TSStatus> clientHandler =
-        new DataNodeAsyncRequestContext<>(
-            CnToDnAsyncRequestType.UPDATE_TEMPLATE,
-            rollbackTemplateSetInfoReq,
-            dataNodeLocationMap);
-    
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
-    Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
-    for (TSStatus status : statusMap.values()) {
-      // all dataNodes must clear the related template cache
-      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        LOGGER.error(
-            "Failed to rollback template cache of template {} set on {}", 
template.getName(), path);
-        throw new ProcedureException(new MetadataException("Rollback template 
cache failed"));
-      }
-    }
-  }
-
-  private ByteBuffer getAddTemplateSetInfo() {
-    if (this.addTemplateSetInfo == null) {
-      this.addTemplateSetInfo =
-          ByteBuffer.wrap(
-              TemplateInternalRPCUtil.generateAddTemplateSetInfoBytes(
-                  template, path.getFullPath()));
-    }
-
-    return addTemplateSetInfo;
-  }
-
   @Override
   protected boolean isRollbackSupported(final UnsetTemplateState 
unsetTemplateState) {
     return true;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/NotifyRegionMigrationState.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/NotifyRegionMigrationState.java
deleted file mode 100644
index 1b964621e5b..00000000000
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/NotifyRegionMigrationState.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.procedure.state;
-
-public enum NotifyRegionMigrationState {
-  INIT,
-}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index fc88b54f3d5..e5d43137fac 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -40,7 +40,6 @@ import 
org.apache.iotdb.confignode.procedure.impl.pipe.task.StartPipeProcedureV2
 import 
org.apache.iotdb.confignode.procedure.impl.pipe.task.StopPipeProcedureV2;
 import 
org.apache.iotdb.confignode.procedure.impl.region.AddRegionPeerProcedure;
 import 
org.apache.iotdb.confignode.procedure.impl.region.CreateRegionGroupsProcedure;
-import 
org.apache.iotdb.confignode.procedure.impl.region.NotifyRegionMigrationProcedure;
 import 
org.apache.iotdb.confignode.procedure.impl.region.ReconstructRegionProcedure;
 import 
org.apache.iotdb.confignode.procedure.impl.region.RegionMigrateProcedure;
 import 
org.apache.iotdb.confignode.procedure.impl.region.RemoveRegionPeerProcedure;
@@ -121,10 +120,6 @@ public class ProcedureFactory implements IProcedureFactory 
{
         break;
       case RECONSTRUCT_REGION_PROCEDURE:
         procedure = new ReconstructRegionProcedure();
-        break;
-      case NOTIFY_REGION_MIGRATION_PROCEDURE:
-        procedure = new NotifyRegionMigrationProcedure();
-        break;
       case DELETE_TIMESERIES_PROCEDURE:
         procedure = new DeleteTimeSeriesProcedure(false);
         break;
@@ -323,8 +318,6 @@ public class ProcedureFactory implements IProcedureFactory {
       return ProcedureType.DELETE_TIMESERIES_PROCEDURE;
     } else if (procedure instanceof ReconstructRegionProcedure) {
       return ProcedureType.RECONSTRUCT_REGION_PROCEDURE;
-    } else if (procedure instanceof NotifyRegionMigrationProcedure) {
-      return ProcedureType.NOTIFY_REGION_MIGRATION_PROCEDURE;
     } else if (procedure instanceof CreateTriggerProcedure) {
       return ProcedureType.CREATE_TRIGGER_PROCEDURE;
     } else if (procedure instanceof DropTriggerProcedure) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
index 48ccca42d44..82b28186756 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
@@ -39,7 +39,6 @@ public enum ProcedureType {
   RECONSTRUCT_REGION_PROCEDURE((short) 203),
   ADD_REGION_PEER_PROCEDURE((short) 204),
   REMOVE_REGION_PEER_PROCEDURE((short) 205),
-  NOTIFY_REGION_MIGRATION_PROCEDURE((short) 206),
   @TestOnly
   CREATE_MANY_DATABASES_PROCEDURE((short) 250),
 
diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index b97a532249a..ae4ae4c62b2 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -262,7 +262,7 @@
             <artifactId>jersey-container-servlet-core</artifactId>
         </dependency>
         <dependency>
-            <groupId>com.github.moquette-io.moquette</groupId>
+            <groupId>io.moquette</groupId>
             <artifactId>moquette-broker</artifactId>
         </dependency>
         <dependency>
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaServerHandle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaServerHandle.java
index 4f5c6e58a96..3560bd26d3f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaServerHandle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaServerHandle.java
@@ -65,6 +65,7 @@ public class OpcDaServerHandle implements Closeable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpcDaServerHandle.class);
 
+  private final PointerByReference ppvServer = new PointerByReference();
   private final OpcDaHeader.IOPCServer opcServer;
   private final OpcDaHeader.IOPCItemMgt itemMgt;
   private final OpcDaHeader.IOPCSyncIO syncIO;
@@ -267,7 +268,6 @@ public class OpcDaServerHandle implements Closeable {
     // Free after write
     if (Objects.nonNull(bstr)) {
       OleAuto.INSTANCE.SysFreeString(bstr);
-      bstr = null;
     }
 
     final Pointer pErrors = ppErrors.getValue();
@@ -369,6 +369,9 @@ public class OpcDaServerHandle implements Closeable {
     serverHandleMap.clear();
 
     // Release resource
+    if (Objects.nonNull(ppvServer.getValue())) {
+      Ole32.INSTANCE.CoTaskMemFree(ppvServer.getValue());
+    }
     if (Objects.nonNull(syncIO)) {
       syncIO.Release();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index b92c066e8be..bf0b8df2d6c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -199,24 +199,17 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
       final AtomicInteger eventsReferenceCount = new 
AtomicInteger(sealedFiles.size());
       final AtomicBoolean eventsHadBeenAddedToRetryQueue = new 
AtomicBoolean(false);
 
-      try {
-        for (final File sealedFile : sealedFiles) {
-          transfer(
-              new PipeTransferTsFileHandler(
-                  this,
-                  pipe2WeightMap,
-                  events,
-                  eventsReferenceCount,
-                  eventsHadBeenAddedToRetryQueue,
-                  sealedFile,
-                  null,
-                  false));
-        }
-      } catch (final Throwable t) {
-        LOGGER.warn("Failed to transfer tsfile batch ({}).", sealedFiles, t);
-        if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
-          addFailureEventsToRetryQueue(events);
-        }
+      for (final File sealedFile : sealedFiles) {
+        transfer(
+            new PipeTransferTsFileHandler(
+                this,
+                pipe2WeightMap,
+                events,
+                eventsReferenceCount,
+                eventsHadBeenAddedToRetryQueue,
+                sealedFile,
+                null,
+                false));
       }
     } else {
       LOGGER.warn(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index d0924477154..c5e92c754ac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -207,7 +207,6 @@ import 
org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
 import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
 import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
 import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
-import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq;
@@ -1934,6 +1933,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
   @Override
   public TSStatus updateTemplate(final TUpdateTemplateReq req) {
     switch (TemplateInternalRPCUpdateType.getType(req.type)) {
+        // Reserved for rolling upgrade
       case ROLLBACK_INVALIDATE_TEMPLATE_SET_INFO:
         
ClusterTemplateManager.getInstance().addTemplateSetInfo(req.getTemplateInfo());
         break;
@@ -2145,12 +2145,6 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     return RegionMigrateService.getInstance().getRegionMaintainResult(taskId);
   }
 
-  @Override
-  public TSStatus notifyRegionMigration(TNotifyRegionMigrationReq req) throws 
TException {
-    RegionMigrateService.getInstance().notifyRegionMigration(req);
-    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-  }
-
   private TSStatus createNewRegion(ConsensusGroupId regionId, String 
storageGroup) {
     return regionManager.createNewRegion(regionId, storageGroup);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 0af4d91adfe..04d38f25f89 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -56,7 +56,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePie
 import 
org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult;
 import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
-import org.apache.iotdb.db.service.RegionMigrateService;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -183,8 +182,6 @@ public class LoadTsFileScheduler implements IScheduler {
           }
           shouldRemoveFileFromLoadingSet = true;
 
-          final long startTimeMs = System.currentTimeMillis();
-
           if (node.isTsFileEmpty()) {
             LOGGER.info("Load skip TsFile {}, because it has no data.", 
filePath);
           } else if (!node.needDecodeTsFile(
@@ -229,13 +226,6 @@ public class LoadTsFileScheduler implements IScheduler {
             }
           }
 
-          if (RegionMigrateService.getInstance().getLastNotifyTime() > 
startTimeMs) {
-            LOGGER.warn(
-                "LoadTsFileScheduler: Region migration started or ended during 
loading TsFile {}, will convert to insertion to avoid data loss",
-                filePath);
-            isLoadSingleTsFileSuccess = false;
-          }
-
           if (isLoadSingleTsFileSuccess) {
             node.clean();
             LOGGER.info(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/template/TemplateInternalRPCUpdateType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/template/TemplateInternalRPCUpdateType.java
index 77193913044..ebe730114e2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/template/TemplateInternalRPCUpdateType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/template/TemplateInternalRPCUpdateType.java
@@ -26,6 +26,7 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 public enum TemplateInternalRPCUpdateType {
+  // Deprecated
   ROLLBACK_INVALIDATE_TEMPLATE_SET_INFO((byte) 0),
   INVALIDATE_TEMPLATE_SET_INFO((byte) 1),
   ADD_TEMPLATE_PRE_SET_INFO((byte) 2),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index 05328ea91b9..056df802c13 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -41,7 +41,6 @@ import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.protocol.thrift.impl.DataNodeRegionManager;
 import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
-import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionMigrateResult;
 import org.apache.iotdb.mpp.rpc.thrift.TResetPeerListReq;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -54,7 +53,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 public class RegionMigrateService implements IService {
@@ -74,9 +72,6 @@ public class RegionMigrateService implements IService {
   // where different asynchronous tasks are submitted to the same datanode 
within a single procedure
   private static final ConcurrentHashMap<Long, TRegionMigrateResult> 
taskResultMap =
       new ConcurrentHashMap<>();
-
-  private static final AtomicLong lastNotifyTime = new 
AtomicLong(Long.MIN_VALUE);
-
   private static final TRegionMigrateResult unfinishedResult = new 
TRegionMigrateResult();
 
   private RegionMigrateService() {}
@@ -85,19 +80,6 @@ public class RegionMigrateService implements IService {
     return Holder.INSTANCE;
   }
 
-  public void notifyRegionMigration(TNotifyRegionMigrationReq req) {
-    lastNotifyTime.set(System.currentTimeMillis());
-    if (req.isIsStart()) {
-      LOGGER.info("Region {} is notified to begin migrating", 
req.getRegionId());
-    } else {
-      LOGGER.info("Region {} is notified to finish migrating", 
req.getRegionId());
-    }
-  }
-
-  public long getLastNotifyTime() {
-    return lastNotifyTime.get();
-  }
-
   /**
    * Submit AddRegionPeerTask
    *
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
index f1ee9f8867b..6d70d5796ff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.subscription.agent;
 
 import org.apache.iotdb.db.subscription.broker.SubscriptionBroker;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
-import 
org.apache.iotdb.db.subscription.resource.SubscriptionDataNodeResourceManager;
 import 
org.apache.iotdb.db.subscription.task.subtask.SubscriptionConnectorSubtask;
 import org.apache.iotdb.rpc.subscription.config.ConsumerConfig;
 import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
@@ -198,13 +197,8 @@ public class SubscriptionBrokerAgent {
   public boolean executePrefetch(final String consumerGroupId, final String 
topicName) {
     final SubscriptionBroker broker = 
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
     if (Objects.isNull(broker)) {
-      SubscriptionDataNodeResourceManager.log()
-          .schedule(SubscriptionBrokerAgent.class, consumerGroupId, topicName)
-          .ifPresent(
-              l ->
-                  l.warn(
-                      "Subscription: broker bound to consumer group [{}] does 
not exist",
-                      consumerGroupId));
+      LOGGER.warn(
+          "Subscription: broker bound to consumer group [{}] does not exist", 
consumerGroupId);
       return false;
     }
     return broker.executePrefetch(topicName);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index 1223672810d..df888ec1b03 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
 import 
org.apache.iotdb.db.subscription.metric.SubscriptionPrefetchingQueueMetrics;
-import 
org.apache.iotdb.db.subscription.resource.SubscriptionDataNodeResourceManager;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.rpc.subscription.config.TopicConstant;
 import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
@@ -420,25 +419,17 @@ public class SubscriptionBroker {
     final SubscriptionPrefetchingQueue prefetchingQueue =
         topicNameToPrefetchingQueue.get(topicName);
     if (Objects.isNull(prefetchingQueue)) {
-      SubscriptionDataNodeResourceManager.log()
-          .schedule(SubscriptionBroker.class, brokerId, topicName)
-          .ifPresent(
-              l ->
-                  l.warn(
-                      "Subscription: prefetching queue bound to topic [{}] for 
consumer group [{}] does not exist",
-                      topicName,
-                      brokerId));
+      LOGGER.warn(
+          "Subscription: prefetching queue bound to topic [{}] for consumer 
group [{}] does not exist",
+          topicName,
+          brokerId);
       return false;
     }
     if (prefetchingQueue.isClosed()) {
-      SubscriptionDataNodeResourceManager.log()
-          .schedule(SubscriptionBroker.class, brokerId, topicName)
-          .ifPresent(
-              l ->
-                  l.warn(
-                      "Subscription: prefetching queue bound to topic [{}] for 
consumer group [{}] is closed",
-                      topicName,
-                      brokerId));
+      LOGGER.warn(
+          "Subscription: prefetching queue bound to topic [{}] for consumer 
group [{}] is closed",
+          topicName,
+          brokerId);
       return false;
     }
     return prefetchingQueue.executePrefetch();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index abd3e1c300c..0514ac0075e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -29,7 +29,6 @@ import 
org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
 import 
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatches;
-import 
org.apache.iotdb.db.subscription.resource.SubscriptionDataNodeResourceManager;
 import 
org.apache.iotdb.db.subscription.task.subtask.SubscriptionReceiverSubtask;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -98,6 +97,9 @@ public abstract class SubscriptionPrefetchingQueue {
 
   private final SubscriptionPrefetchingQueueStates states;
 
+  private static final long STATE_REPORT_INTERVAL_IN_MS = 10_000L;
+  private long lastStateReportTimestamp = System.currentTimeMillis();
+
   private volatile boolean isCompleted = false;
   private volatile boolean isClosed = false;
 
@@ -278,9 +280,10 @@ public abstract class SubscriptionPrefetchingQueue {
   }
 
   private void reportStateIfNeeded() {
-    SubscriptionDataNodeResourceManager.log()
-        .schedule(SubscriptionPrefetchingQueue.class, brokerId, topicName)
-        .ifPresent(l -> l.info("Subscription: SubscriptionPrefetchingQueue 
state {}", this));
+    if (System.currentTimeMillis() - lastStateReportTimestamp > 
STATE_REPORT_INTERVAL_IN_MS) {
+      LOGGER.info("Subscription: SubscriptionPrefetchingQueue state {}", this);
+      lastStateReportTimestamp = System.currentTimeMillis();
+    }
   }
 
   @SafeVarargs
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/SubscriptionDataNodeResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/SubscriptionDataNodeResourceManager.java
deleted file mode 100644
index 347299df10f..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/SubscriptionDataNodeResourceManager.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.subscription.resource;
-
-import org.apache.iotdb.db.subscription.resource.log.SubscriptionLogManager;
-
-public class SubscriptionDataNodeResourceManager {
-
-  private final SubscriptionLogManager subscriptionLogManager;
-
-  public static SubscriptionLogManager log() {
-    return 
SubscriptionDataNodeResourceManagerHolder.INSTANCE.subscriptionLogManager;
-  }
-
-  ///////////////////////////// SINGLETON /////////////////////////////
-
-  private SubscriptionDataNodeResourceManager() {
-    subscriptionLogManager = new SubscriptionLogManager();
-  }
-
-  private static class SubscriptionDataNodeResourceManagerHolder {
-
-    private static final SubscriptionDataNodeResourceManager INSTANCE =
-        new SubscriptionDataNodeResourceManager();
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/log/SubscriptionLogManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/log/SubscriptionLogManager.java
deleted file mode 100644
index 549f9344d7f..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/log/SubscriptionLogManager.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.subscription.resource.log;
-
-import org.slf4j.Logger;
-
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-public class SubscriptionLogManager {
-
-  private final ConcurrentMap<Class<?>, SubscriptionLogStatus> 
logClass2LogStatusMap =
-      new ConcurrentHashMap<>();
-
-  public Optional<Logger> schedule(
-      final Class<?> logClass, final String consumerGroupId, final String 
topicName) {
-    return logClass2LogStatusMap
-        .computeIfAbsent(logClass, k -> new SubscriptionLogStatus(logClass))
-        .schedule(consumerGroupId, topicName);
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/log/SubscriptionLogStatus.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/log/SubscriptionLogStatus.java
deleted file mode 100644
index 0daae1dc937..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/log/SubscriptionLogStatus.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.subscription.resource.log;
-
-import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
-import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
-
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import org.apache.tsfile.utils.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-class SubscriptionLogStatus {
-
-  private static final long BASE_INTERVAL_IN_MS =
-      
SubscriptionConfig.getInstance().getSubscriptionLogManagerBaseIntervalMs();
-
-  private final Logger logger;
-  private final Cache<Pair<String, String>, AtomicLong> lastReportTimestamps;
-
-  public SubscriptionLogStatus(final Class<?> logClass) {
-    this.logger = LoggerFactory.getLogger(logClass);
-    this.lastReportTimestamps =
-        Caffeine.newBuilder()
-            .expireAfterAccess(
-                
SubscriptionConfig.getInstance().getSubscriptionLogManagerWindowSeconds(),
-                TimeUnit.SECONDS)
-            .build();
-  }
-
-  public Optional<Logger> schedule(final String consumerGroupId, final String 
topicName) {
-    final Pair<String, String> key = new Pair<>(consumerGroupId, topicName);
-    final long now = System.currentTimeMillis();
-    // Calculate the allowed logging interval based on the current prefetching 
queue count
-    final int count = SubscriptionAgent.broker().getPrefetchingQueueCount();
-    final long allowedInterval = BASE_INTERVAL_IN_MS * count;
-    // If the key does not exist, initialize an AtomicLong set to one interval 
before now
-    final AtomicLong lastTime =
-        Objects.requireNonNull(
-            lastReportTimestamps.get(
-                key,
-                k ->
-                    new AtomicLong(
-                        now
-                            // introduce randomness
-                            - BASE_INTERVAL_IN_MS
-                                * ThreadLocalRandom.current().nextLong(1, 
count + 1))));
-    final long last = lastTime.get();
-    if (now - last >= allowedInterval) {
-      // Use compareAndSet to ensure that only one thread updates at a time,
-      // so that only one log entry is printed per allowed interval
-      if (lastTime.compareAndSet(last, now)) {
-        return Optional.of(logger);
-      }
-    }
-    return Optional.empty();
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index f379f236290..0b9f4d951b0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -1288,11 +1288,8 @@ public abstract class AlignedTVList extends TVList {
     if (largestBinaryChunkSize == 0) {
       return largestPrimitivePointSize;
     }
-    int columnValueCnt = getColumnValueCnt(largestBinaryColumnIndex);
-    if (columnValueCnt == 0) {
-      return largestPrimitivePointSize;
-    }
-    int avgPointSizeOfLargestBinaryColumn = (int) largestBinaryChunkSize / 
columnValueCnt;
+    int avgPointSizeOfLargestBinaryColumn =
+        (int) largestBinaryChunkSize / 
getColumnValueCnt(largestBinaryColumnIndex);
     return Math.max(avgPointSizeOfLargestBinaryColumn, 
largestPrimitivePointSize);
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 99e041cdfb4..4b814a26386 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -308,7 +308,7 @@ public class CommonConfig {
   private int subscriptionPollMaxBlockingTimeMs = 500;
   private int subscriptionDefaultTimeoutInMs = 10_000; // 10s
   private long subscriptionLaunchRetryIntervalMs = 1000;
-  private int subscriptionRecycleUncommittedEventIntervalMs = 600_000; // 600s
+  private int subscriptionRecycleUncommittedEventIntervalMs = 600000; // 600s
   private long subscriptionReadFileBufferSize = 8 * MB;
   private long subscriptionReadTabletBufferSize = 8 * MB;
   private long subscriptionTsFileDeduplicationWindowSeconds = 120; // 120s
@@ -316,8 +316,6 @@ public class CommonConfig {
   private long subscriptionEstimatedInsertNodeTabletInsertionEventSize = 64 * 
KB;
   private long subscriptionEstimatedRawTabletInsertionEventSize = 16 * KB;
   private long subscriptionMaxAllowedEventCountInTabletBatch = 100;
-  private long subscriptionLogManagerWindowSeconds = 120; // 120s
-  private long subscriptionLogManagerBaseIntervalMs = 1_000; // 1s
 
   private boolean subscriptionPrefetchEnabled = false;
   private float subscriptionPrefetchMemoryThreshold = 0.5F;
@@ -1510,23 +1508,6 @@ public class CommonConfig {
         subscriptionMaxAllowedEventCountInTabletBatch;
   }
 
-  public long getSubscriptionLogManagerWindowSeconds() {
-    return subscriptionLogManagerWindowSeconds;
-  }
-
-  public void setSubscriptionLogManagerWindowSeconds(long 
subscriptionLogManagerWindowSeconds) {
-    this.subscriptionLogManagerWindowSeconds = 
subscriptionLogManagerWindowSeconds;
-  }
-
-  public long getSubscriptionLogManagerBaseIntervalMs() {
-    return subscriptionLogManagerBaseIntervalMs;
-  }
-
-  public void setSubscriptionLogManagerBaseIntervalMs(
-      final long subscriptionLogManagerBaseIntervalMs) {
-    this.subscriptionLogManagerBaseIntervalMs = 
subscriptionLogManagerBaseIntervalMs;
-  }
-
   public boolean getSubscriptionPrefetchEnabled() {
     return subscriptionPrefetchEnabled;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index c6b7013b23e..b3613bb6677 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -776,16 +776,6 @@ public class CommonDescriptor {
             properties.getProperty(
                 "subscription_max_allowed_event_count_in_tablet_batch",
                 
String.valueOf(config.getSubscriptionMaxAllowedEventCountInTabletBatch()))));
-    config.setSubscriptionLogManagerWindowSeconds(
-        Long.parseLong(
-            properties.getProperty(
-                "subscription_log_manager_window_seconds",
-                
String.valueOf(config.getSubscriptionLogManagerWindowSeconds()))));
-    config.setSubscriptionLogManagerBaseIntervalMs(
-        Long.parseLong(
-            properties.getProperty(
-                "subscription_log_manager_base_interval_ms",
-                
String.valueOf(config.getSubscriptionLogManagerBaseIntervalMs()))));
 
     config.setSubscriptionPrefetchEnabled(
         Boolean.parseBoolean(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
index a332b87f0ed..1ace6e71de8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
@@ -97,14 +97,6 @@ public class SubscriptionConfig {
     return COMMON_CONFIG.getSubscriptionMaxAllowedEventCountInTabletBatch();
   }
 
-  public long getSubscriptionLogManagerWindowSeconds() {
-    return COMMON_CONFIG.getSubscriptionLogManagerWindowSeconds();
-  }
-
-  public long getSubscriptionLogManagerBaseIntervalMs() {
-    return COMMON_CONFIG.getSubscriptionLogManagerBaseIntervalMs();
-  }
-
   public boolean getSubscriptionPrefetchEnabled() {
     return COMMON_CONFIG.getSubscriptionPrefetchEnabled();
   }
@@ -179,10 +171,6 @@ public class SubscriptionConfig {
     LOGGER.info(
         "SubscriptionMaxAllowedEventCountInTabletBatch: {}",
         getSubscriptionMaxAllowedEventCountInTabletBatch());
-    LOGGER.info(
-        "SubscriptionLogManagerWindowSeconds: {}", 
getSubscriptionLogManagerWindowSeconds());
-    LOGGER.info(
-        "SubscriptionLogManagerBaseIntervalMs: {}", 
getSubscriptionLogManagerBaseIntervalMs());
 
     LOGGER.info("SubscriptionPrefetchEnabled: {}", 
getSubscriptionPrefetchEnabled());
     LOGGER.info(
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index bef96496650..4b25e08d1f6 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -52,11 +52,6 @@ struct TRegionMigrateResult {
   4: required common.TRegionMaintainTaskStatus taskStatus
 }
 
-struct TNotifyRegionMigrationReq {
-  1: required common.TConsensusGroupId regionId
-  2: required bool isStart
-}
-
 struct TCreatePeerReq {
   1: required common.TConsensusGroupId regionId
   2: required list<common.TDataNodeLocation> regionLocations
@@ -786,11 +781,6 @@ service IDataNodeRPCService {
    */
   TRegionMigrateResult getRegionMaintainResult(i64 taskId)
 
-    /**
-     * Notify the DataNode of the beginning or ending the migration of the 
specified RegionGroup
-     */
-    common.TSStatus notifyRegionMigration(TNotifyRegionMigrationReq req)
-
   /**
    * Config node will clean DataNode cache, the Data node will not accept 
read/write request when disabled
    * @param data node location
diff --git a/pom.xml b/pom.xml
index c5dd2933b25..8d8920a5dba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,13 +21,6 @@
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <modelVersion>4.0.0</modelVersion>
-    <!--Add the JitPack repository because the moquette-broker dependency 
needs to be resolved from the JitPack repository-->
-    <repositories>
-        <repository>
-            <id>jitpack.io</id>
-            <url>https://jitpack.io</url>
-        </repository>
-    </repositories>
     <parent>
         <groupId>org.apache</groupId>
         <artifactId>apache</artifactId>
@@ -128,7 +121,7 @@
         <mockito.version>2.23.4</mockito.version>
         <!-- This was the last version to support Java 8 -->
         <!--mockito.version>4.11.0</mockito.version-->
-        <moquette.version>0.18.0</moquette.version>
+        <moquette.version>0.17</moquette.version>
         <netty.version>4.1.115.Final</netty.version>
         <nimbus-jose-jwt.version>9.37.3</nimbus-jose-jwt.version>
         <oauth2-oidc-sdk.version>10.15</oauth2-oidc-sdk.version>
@@ -175,7 +168,7 @@
         <thrift.version>0.14.1</thrift.version>
         <xz.version>1.9</xz.version>
         <zstd-jni.version>1.5.6-3</zstd-jni.version>
-        <tsfile.version>1.1.1-250324-SNAPSHOT</tsfile.version>
+        <tsfile.version>1.1.0-250219-SNAPSHOT</tsfile.version>
     </properties>
     <!--
     if we claim dependencies in dependencyManagement, then we do not claim
@@ -439,7 +432,7 @@
                 <version>${reflections.version}</version>
             </dependency>
             <dependency>
-                <groupId>com.github.moquette-io.moquette</groupId>
+                <groupId>io.moquette</groupId>
                 <artifactId>moquette-broker</artifactId>
                 <version>${moquette.version}</version>
             </dependency>

Reply via email to