This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a8a7bedb5b Avoiding error log by removing ThreadLocal for Mqtt Client
8a8a7bedb5b is described below

commit 8a8a7bedb5b00137dc6c3897ff73b2cc07cfd13e
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Jun 17 18:36:08 2025 +0800

    Avoiding error log by removing ThreadLocal for Mqtt Client
---
 .../apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java   |  4 ++--
 .../apache/iotdb/db/protocol/session/SessionManager.java   | 14 ++++++++++++++
 .../metadata/fetcher/TableDeviceSchemaFetcher.java         | 12 ++++++++----
 .../metadata/fetcher/TableDeviceSchemaValidator.java       |  6 ++++--
 4 files changed, 28 insertions(+), 8 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index ad97dd310c3..86dc21631e6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -101,7 +101,7 @@ public class MPPPublishHandler extends 
AbstractInterceptHandler {
           TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3,
           ClientVersion.V_1_0,
           useTableInsert ? IClientSession.SqlDialect.TABLE : 
IClientSession.SqlDialect.TREE);
-      sessionManager.registerSession(session);
+      sessionManager.registerSessionForMqtt(session);
       clientIdToSessionMap.put(msg.getClientID(), session);
     }
   }
@@ -110,7 +110,7 @@ public class MPPPublishHandler extends 
AbstractInterceptHandler {
   public void onDisconnect(InterceptDisconnectMessage msg) {
     MqttClientSession session = clientIdToSessionMap.remove(msg.getClientID());
     if (null != session) {
-      sessionManager.removeCurrSession();
+      sessionManager.removeCurrSessionForMqtt(session);
       sessionManager.closeSession(session, 
Coordinator.getInstance()::cleanupQueryExecution);
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
index 764d5e9fb25..d7c5cb6f8be 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
@@ -334,6 +334,12 @@ public class SessionManager implements SessionManagerMBean 
{
     currSessionIdleTime.remove();
   }
 
+  public void removeCurrSessionForMqtt(MqttClientSession mqttClientSession) {
+    if (mqttClientSession != null) {
+      sessions.remove(mqttClientSession);
+    }
+  }
+
   /**
    * this method can be only used in client-thread model. Do not use this 
method in message-thread
    * model based service.
@@ -351,6 +357,14 @@ public class SessionManager implements SessionManagerMBean 
{
     return true;
   }
 
+  /**
+   * this method can be only used in mqtt model. Do not use this method in 
client-thread model based
+   * service.
+   */
+  public void registerSessionForMqtt(IClientSession session) {
+    sessions.put(session, placeHolder);
+  }
+
   /** must be called after registerSession()) will mark the session login. */
   public void supplySession(
       IClientSession session,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java
index e39c8c2507a..a4d10fdc275 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java
@@ -118,8 +118,10 @@ public class TableDeviceSchemaFetcher {
               relationSqlParser,
               SessionManager.getInstance().getCurrSession(),
               queryId,
-              SessionManager.getInstance()
-                  
.getSessionInfoOfTableModel(SessionManager.getInstance().getCurrSession()),
+              context == null
+                  ? SessionManager.getInstance()
+                      
.getSessionInfoOfTableModel(SessionManager.getInstance().getCurrSession())
+                  : context.getSession(),
               "Fetch Device for insert",
               LocalExecutionPlanner.getInstance().metadata,
               // Never timeout for insert
@@ -468,8 +470,10 @@ public class TableDeviceSchemaFetcher {
               relationSqlParser,
               SessionManager.getInstance().getCurrSession(),
               queryId,
-              SessionManager.getInstance()
-                  
.getSessionInfo(SessionManager.getInstance().getCurrSession()),
+              mppQueryContext == null
+                  ? SessionManager.getInstance()
+                      
.getSessionInfo(SessionManager.getInstance().getCurrSession())
+                  : mppQueryContext.getSession(),
               String.format(
                   "fetch device for query %s : %s",
                   mppQueryContext.getQueryId(), mppQueryContext.getSql()),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java
index 7d0e827d75b..b50a875d7b0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java
@@ -237,8 +237,10 @@ public class TableDeviceSchemaValidator {
             relationSqlParser,
             SessionManager.getInstance().getCurrSession(),
             SessionManager.getInstance().requestQueryId(),
-            SessionManager.getInstance()
-                .getSessionInfo(SessionManager.getInstance().getCurrSession()),
+            context == null
+                ? SessionManager.getInstance()
+                    
.getSessionInfo(SessionManager.getInstance().getCurrSession())
+                : context.getSession(),
             "Create device or update device attribute for insert",
             LocalExecutionPlanner.getInstance().metadata,
             // Never timeout for write statement

Reply via email to