This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8a8a7bedb5b Avoiding error log by removing ThreadLocal for Mqtt Client
8a8a7bedb5b is described below
commit 8a8a7bedb5b00137dc6c3897ff73b2cc07cfd13e
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Jun 17 18:36:08 2025 +0800
Avoiding error log by removing ThreadLocal for Mqtt Client
---
.../apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java | 4 ++--
.../apache/iotdb/db/protocol/session/SessionManager.java | 14 ++++++++++++++
.../metadata/fetcher/TableDeviceSchemaFetcher.java | 12 ++++++++----
.../metadata/fetcher/TableDeviceSchemaValidator.java | 6 ++++--
4 files changed, 28 insertions(+), 8 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index ad97dd310c3..86dc21631e6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -101,7 +101,7 @@ public class MPPPublishHandler extends
AbstractInterceptHandler {
TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3,
ClientVersion.V_1_0,
useTableInsert ? IClientSession.SqlDialect.TABLE :
IClientSession.SqlDialect.TREE);
- sessionManager.registerSession(session);
+ sessionManager.registerSessionForMqtt(session);
clientIdToSessionMap.put(msg.getClientID(), session);
}
}
@@ -110,7 +110,7 @@ public class MPPPublishHandler extends
AbstractInterceptHandler {
public void onDisconnect(InterceptDisconnectMessage msg) {
MqttClientSession session = clientIdToSessionMap.remove(msg.getClientID());
if (null != session) {
- sessionManager.removeCurrSession();
+ sessionManager.removeCurrSessionForMqtt(session);
sessionManager.closeSession(session,
Coordinator.getInstance()::cleanupQueryExecution);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
index 764d5e9fb25..d7c5cb6f8be 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
@@ -334,6 +334,12 @@ public class SessionManager implements SessionManagerMBean
{
currSessionIdleTime.remove();
}
+ public void removeCurrSessionForMqtt(MqttClientSession mqttClientSession) {
+ if (mqttClientSession != null) {
+ sessions.remove(mqttClientSession);
+ }
+ }
+
/**
* this method can be only used in client-thread model. Do not use this
method in message-thread
* model based service.
@@ -351,6 +357,14 @@ public class SessionManager implements SessionManagerMBean
{
return true;
}
+ /**
+ * this method can be only used in mqtt model. Do not use this method in
client-thread model based
+ * service.
+ */
+ public void registerSessionForMqtt(IClientSession session) {
+ sessions.put(session, placeHolder);
+ }
+
/** must be called after registerSession()) will mark the session login. */
public void supplySession(
IClientSession session,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java
index e39c8c2507a..a4d10fdc275 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java
@@ -118,8 +118,10 @@ public class TableDeviceSchemaFetcher {
relationSqlParser,
SessionManager.getInstance().getCurrSession(),
queryId,
- SessionManager.getInstance()
-
.getSessionInfoOfTableModel(SessionManager.getInstance().getCurrSession()),
+ context == null
+ ? SessionManager.getInstance()
+
.getSessionInfoOfTableModel(SessionManager.getInstance().getCurrSession())
+ : context.getSession(),
"Fetch Device for insert",
LocalExecutionPlanner.getInstance().metadata,
// Never timeout for insert
@@ -468,8 +470,10 @@ public class TableDeviceSchemaFetcher {
relationSqlParser,
SessionManager.getInstance().getCurrSession(),
queryId,
- SessionManager.getInstance()
-
.getSessionInfo(SessionManager.getInstance().getCurrSession()),
+ mppQueryContext == null
+ ? SessionManager.getInstance()
+
.getSessionInfo(SessionManager.getInstance().getCurrSession())
+ : mppQueryContext.getSession(),
String.format(
"fetch device for query %s : %s",
mppQueryContext.getQueryId(), mppQueryContext.getSql()),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java
index 7d0e827d75b..b50a875d7b0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java
@@ -237,8 +237,10 @@ public class TableDeviceSchemaValidator {
relationSqlParser,
SessionManager.getInstance().getCurrSession(),
SessionManager.getInstance().requestQueryId(),
- SessionManager.getInstance()
- .getSessionInfo(SessionManager.getInstance().getCurrSession()),
+ context == null
+ ? SessionManager.getInstance()
+
.getSessionInfo(SessionManager.getInstance().getCurrSession())
+ : context.getSession(),
"Create device or update device attribute for insert",
LocalExecutionPlanner.getInstance().metadata,
// Never timeout for write statement