This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch opc-large-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 53ec096b457ac7f3cf266d7679c3d77ccd35b65d
Author: Caideyipi <[email protected]>
AuthorDate: Mon Mar 30 12:20:06 2026 +0800

    partial
---
 .../apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java | 13 +++++++++++++
 .../org/apache/iotdb/db/service/ExternalRPCService.java     |  4 ++++
 2 files changed, 17 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index ac0c10d287f..91b959b229c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -31,9 +31,13 @@ import 
org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
 import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
 import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaServerBuilder;
 import org.apache.iotdb.db.pipe.sink.protocol.writeback.WriteBackSink;
+import org.apache.iotdb.db.protocol.session.IClientSession;
 import org.apache.iotdb.db.protocol.session.InternalClientSession;
 import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl;
+import org.apache.iotdb.db.protocol.thrift.impl.IClientRPCServiceWithHandler;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.service.ExternalRPCService;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.pipe.api.PipeConnector;
@@ -47,6 +51,7 @@ import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
 import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.record.Tablet;
@@ -64,6 +69,7 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.time.ZoneId;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -387,10 +393,16 @@ public class OpcUaSink implements PipeConnector {
         || Objects.isNull(environment.getSourceParameters())) {
       return;
     }
+    final IClientRPCServiceWithHandler clientRPCServiceImpl =
+        ExternalRPCService.getInstance().getImpl();
+    if (!(clientRPCServiceImpl instanceof ClientRPCServiceImpl)) {
+      return;
+    }
     markInitializing(true);
 
     try {
       initializeSession();
+      clientRPCServiceImpl.executeLastDataQueryV2(new 
TSLastDataQueryReq(session.getId(), Collections.emptyList(), ));
 
       initialized = true;
       if (Objects.nonNull(session)) {
@@ -427,6 +439,7 @@ public class OpcUaSink implements PipeConnector {
       session.setClientVersion(IoTDBConstant.ClientVersion.V_1_0);
       session.setZoneId(ZoneId.systemDefault());
       session.setUsername(usernameString);
+      session.setSqlDialect(IClientSession.SqlDialect.TREE);
     }
 
     if (SESSION_MANAGER.getCurrSession() == null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/ExternalRPCService.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/ExternalRPCService.java
index dcc8690de8a..b4d576874e0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/ExternalRPCService.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/ExternalRPCService.java
@@ -118,6 +118,10 @@ public class ExternalRPCService extends ThriftService 
implements ExternalRPCServ
     return getBindPort();
   }
 
+  public IClientRPCServiceWithHandler getImpl() {
+    return impl;
+  }
+
   private static class RPCServiceHolder {
 
     private static final ExternalRPCService INSTANCE = new 
ExternalRPCService();

Reply via email to