This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch cp-opc-client
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/cp-opc-client by this push:
     new bde1028e788 fix
bde1028e788 is described below

commit bde1028e788831d8407bfc5aca6262290240504f
Author: Caideyipi <[email protected]>
AuthorDate: Thu Mar 26 16:53:25 2026 +0800

    fix
---
 .../iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java     |   1 -
 .../db/pipe/sink/protocol/opcua/OpcUaSink.java     |  56 ++-----
 .../protocol/opcua/client/IoTDBOpcUaClient.java    |   8 +-
 .../sink/protocol/opcua/server/OpcUaNameSpace.java | 163 +++++++--------------
 4 files changed, 67 insertions(+), 161 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index c31627f6143..4b4c69b951d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.it.env.cluster.EnvUtils;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT1;
 import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.tsfile.common.conf.TSFileConfig;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index 00f52bb5073..6d44322a3d5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -26,8 +26,6 @@ import 
org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner;
 import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
 import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
 import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaServerBuilder;
-import org.apache.iotdb.db.storageengine.StorageEngine;
-import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
@@ -48,6 +46,8 @@ import 
org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.util.Arrays;
 import java.util.Map;
@@ -78,8 +78,6 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_PUB_SUB_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_NODE_URL_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_PLACEHOLDER_4_NULL_TAG_DEFAULT_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_PLACEHOLDER_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE;
@@ -94,6 +92,10 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_WITH_QUALITY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USERNAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_KEY;
@@ -103,11 +105,12 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SIN
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HTTPS_BIND_PORT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_MODEL_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_NODE_URL_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_PLACEHOLDER_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_QUALITY_NAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_DIR_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_POLICY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_TCP_BIND_PORT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_VALUE_NAME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_WITH_QUALITY_KEY;
 
 /**
  * Send data in IoTDB based on Opc Ua protocol, using Eclipse Milo. All data 
are converted into
@@ -208,29 +211,12 @@ public class OpcUaSink implements PipeConnector {
                 Arrays.asList(CONNECTOR_OPC_UA_MODEL_KEY, 
SINK_OPC_UA_MODEL_KEY),
                 CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
             .equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE);
-    placeHolder4NullTag =
-        parameters.getStringOrDefault(
-            Arrays.asList(CONNECTOR_OPC_UA_PLACEHOLDER_KEY, 
SINK_OPC_UA_PLACEHOLDER_KEY),
-            CONNECTOR_OPC_UA_PLACEHOLDER_4_NULL_TAG_DEFAULT_VALUE);
-    final DataRegion region =
-        StorageEngine.getInstance()
-            .getDataRegion(new 
DataRegionId(configuration.getRuntimeEnvironment().getRegionId()));
-    databaseName = Objects.nonNull(region) ? region.getDatabaseName() : 
"root.__temp_db";
-
-    if (withQuality && PathUtils.isTableModelDatabase(databaseName)) {
-      throw new PipeException(
-          "When the OPC UA sink sets 'with-quality' to true, the table model 
data is not supported.");
-    }
 
     final String nodeUrl =
         parameters.getStringByKeys(CONNECTOR_OPC_UA_NODE_URL_KEY, 
SINK_OPC_UA_NODE_URL_KEY);
     if (Objects.isNull(nodeUrl)) {
       customizeServer(parameters);
     } else {
-      if (PathUtils.isTableModelDatabase(databaseName)) {
-        throw new PipeException(
-            "When the OPC UA sink points to an outer server, the table model 
data is not supported.");
-      }
       customizeClient(nodeUrl, parameters);
     }
   }
@@ -308,16 +294,7 @@ public class OpcUaSink implements PipeConnector {
                                 
.setEnableAnonymousAccess(enableAnonymousAccess)
                                 .setSecurityPolicies(securityPolicies);
                         final OpcUaServer newServer = builder.build();
-                        nameSpace =
-                            new OpcUaNameSpace(
-                                newServer,
-                                parameters
-                                    .getStringOrDefault(
-                                        Arrays.asList(
-                                            CONNECTOR_OPC_UA_MODEL_KEY, 
SINK_OPC_UA_MODEL_KEY),
-                                        CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
-                                    
.equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE),
-                                builder);
+                        nameSpace = new OpcUaNameSpace(newServer, builder);
                         nameSpace.startup();
                         newServer.startup().get();
                         return new Pair<>(new AtomicInteger(0), nameSpace);
@@ -434,9 +411,12 @@ public class OpcUaSink implements PipeConnector {
 
   @Override
   public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
-    transferByTablet(tabletInsertionEvent, LOGGER, tablet -> {
+    transferByTablet(
+        tabletInsertionEvent,
+        LOGGER,
+        tablet -> {
           if (Objects.nonNull(nameSpace)) {
-            nameSpace.transfer(tablet);
+            nameSpace.transfer(tablet, this);
           } else if (Objects.nonNull(client)) {
             client.transfer(tablet, this);
           } else {
@@ -540,14 +520,6 @@ public class OpcUaSink implements PipeConnector {
     return isClientServerModel;
   }
 
-  public String getDatabaseName() {
-    return databaseName;
-  }
-
-  public String getPlaceHolder4NullTag() {
-    return placeHolder4NullTag;
-  }
-
   @Nullable
   public String getValueName() {
     return valueName;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
index bf96d988180..94889be906e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.write.record.Tablet;
-import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
 import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
 import org.eclipse.milo.opcua.sdk.core.AccessLevel;
@@ -99,12 +99,12 @@ public class IoTDBOpcUaClient {
   // Only support tree model & client-server
   public void transfer(final Tablet tablet, final OpcUaSink sink) throws 
Exception {
     OpcUaNameSpace.transferTabletForClientServerModel(
-        tablet, false, sink, this::transferTabletRowForClientServerModel);
+        tablet, sink, this::transferTabletRowForClientServerModel);
   }
 
   private void transferTabletRowForClientServerModel(
       final String[] segments,
-      final List<IMeasurementSchema> measurementSchemas,
+      final List<MeasurementSchema> measurementSchemas,
       final List<Long> timestamps,
       final List<Object> values,
       final OpcUaSink sink)
@@ -119,7 +119,7 @@ public class IoTDBOpcUaClient {
       if (Objects.isNull(values.get(i))) {
         continue;
       }
-      final String name = measurementSchemas.get(i).getMeasurementName();
+      final String name = measurementSchemas.get(i).getMeasurementId();
       final TSDataType type = measurementSchemas.get(i).getType();
       if (Objects.nonNull(sink.getQualityName()) && 
sink.getQualityName().equals(name)) {
         if (!type.equals(TSDataType.BOOLEAN)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index c6db63bd913..04cc251d655 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -62,6 +62,8 @@ import java.nio.file.Paths;
 import java.sql.Date;
 import java.time.LocalDate;
 import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
@@ -70,16 +72,11 @@ import java.util.UUID;
 public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpcUaNameSpace.class);
   public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server";
-  private final boolean isClientServerModel;
   private final SubscriptionModel subscriptionModel;
   private final OpcUaServerBuilder builder;
 
-  public OpcUaNameSpace(
-      final OpcUaServer server,
-      final boolean isClientServerModel,
-      final OpcUaServerBuilder builder) {
+  public OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder 
builder) {
     super(server, NAMESPACE_URI);
-    this.isClientServerModel = isClientServerModel;
     this.builder = builder;
 
     subscriptionModel = new SubscriptionModel(server, this);
@@ -100,81 +97,43 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
             });
   }
 
-  public void transfer(final Tablet tablet, final boolean isTableModel, final 
OpcUaSink sink)
-      throws Exception {
+  public void transfer(final Tablet tablet, final OpcUaSink sink) throws 
Exception {
     if (sink.isClientServerModel()) {
-      transferTabletForClientServerModel(
-          tablet, isTableModel, sink, 
this::transferTabletRowForClientServerModel);
+      transferTabletForClientServerModel(tablet, sink, 
this::transferTabletRowForClientServerModel);
     } else {
-      transferTabletForPubSubModel(tablet, isTableModel, sink);
+      transferTabletForPubSubModel(tablet);
     }
   }
 
   public static void transferTabletForClientServerModel(
-      final Tablet tablet,
-      final boolean isTableModel,
-      final OpcUaSink sink,
-      final TabletRowConsumer consumer)
+      final Tablet tablet, final OpcUaSink sink, final TabletRowConsumer 
consumer)
       throws Exception {
-    final List<IMeasurementSchema> schemas = tablet.getSchemas();
-    final List<IMeasurementSchema> newSchemas = new ArrayList<>();
-    if (!isTableModel) {
-      new 
PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
-
-      final List<Long> timestamps = new ArrayList<>();
-      final List<Object> values = new ArrayList<>();
-
-      for (int i = 0; i < schemas.size(); ++i) {
-        for (int j = tablet.getRowSize() - 1; j >= 0; --j) {
-          if (!tablet.isNull(j, i)) {
-            newSchemas.add(schemas.get(i));
-            timestamps.add(tablet.getTimestamp(j));
-            values.add(
-                getTabletObjectValue4Opc(tablet.getValues()[i], j, 
schemas.get(i).getType()));
-            break;
-          }
-        }
-      }
+    final List<MeasurementSchema> schemas = tablet.getSchemas();
+    final List<MeasurementSchema> newSchemas = new ArrayList<>();
+    new 
PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
 
-      consumer.accept(tablet.getDeviceId().split("\\."), newSchemas, 
timestamps, values, sink);
-    } else {
-      transferTabletForPubSubModel(tablet);
-    }
-  }
+    final List<Long> timestamps = new ArrayList<>();
+    final List<Object> values = new ArrayList<>();
 
-      for (int i = 0; i < tablet.getRowSize(); ++i) {
-        final Object[] segments = tablet.getDeviceID(i).getSegments();
-        final String[] folderSegments = new String[segments.length + 1];
-        folderSegments[0] = sink.getDatabaseName();
-
-        for (int j = 0; j < segments.length; ++j) {
-          folderSegments[j + 1] =
-              Objects.isNull(segments[j]) ? sink.getPlaceHolder4NullTag() : 
(String) segments[j];
+    for (int i = 0; i < schemas.size(); ++i) {
+      for (int j = tablet.rowSize - 1; j >= 0; --j) {
+        if (tablet.bitMaps == null || tablet.bitMaps[i] == null || 
!tablet.bitMaps[i].isMarked(j)) {
+          newSchemas.add(schemas.get(i));
+          timestamps.add(tablet.timestamps[j]);
+          values.add(getTabletObjectValue4Opc(tablet.values[i], j, 
schemas.get(i).getType()));
+          break;
         }
-
-        final int finalI = i;
-        consumer.accept(
-            folderSegments,
-            newSchemas,
-            Collections.singletonList(tablet.getTimestamp(i)),
-            columnIndexes.stream()
-                .map(
-                    index ->
-                        tablet.isNull(finalI, index)
-                            ? null
-                            : getTabletObjectValue4Opc(
-                                tablet.getValues()[index], finalI, 
schemas.get(index).getType()))
-                .collect(Collectors.toList()),
-            sink);
       }
     }
+
+    consumer.accept(tablet.deviceId.split("\\."), newSchemas, timestamps, 
values, sink);
   }
 
   @FunctionalInterface
   public interface TabletRowConsumer {
     void accept(
         final String[] segments,
-        final List<IMeasurementSchema> measurementSchemas,
+        final List<MeasurementSchema> measurementSchemas,
         final List<Long> timestamps,
         final List<Object> values,
         final OpcUaSink sink)
@@ -183,7 +142,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
 
   private void transferTabletRowForClientServerModel(
       final String[] segments,
-      final List<IMeasurementSchema> measurementSchemas,
+      final List<MeasurementSchema> measurementSchemas,
       final List<Long> timestamps,
       final List<Object> values,
       final OpcUaSink sink) {
@@ -235,7 +194,8 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
                     () ->
                         new PipeRuntimeCriticalException(
                             String.format(
-                                "The folder node for %s does not exist.", 
tablet.deviceId)));
+                                "The folder node for %s does not exist.",
+                                Arrays.toString(segments))));
       }
     }
 
@@ -250,7 +210,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
       if (Objects.isNull(values.get(i))) {
         continue;
       }
-      final String name = measurementSchemas.get(i).getMeasurementName();
+      final String name = measurementSchemas.get(i).getMeasurementId();
       final TSDataType type = measurementSchemas.get(i).getType();
       if (Objects.nonNull(sink.getQualityName()) && 
sink.getQualityName().equals(name)) {
         if (!type.equals(TSDataType.BOOLEAN)) {
@@ -270,37 +230,25 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
           Objects.isNull(sink.getValueName()) ? name : 
segments[segments.length - 1];
       final NodeId nodeId = newNodeId(currentFolder + nodeName);
       final UaVariableNode measurementNode;
-
-      int lastNonnullIndex = -1;
-      for (int j = tablet.rowSize - 1; j >= 0; --j) {
-        if (!tablet.bitMaps[i].isMarked(j)) {
-          lastNonnullIndex = j;
-          break;
-        }
-      }
-
-      if (lastNonnullIndex == -1) {
-        continue;
-      }
-
-      final long utcTimestamp = 
timestampToUtc(tablet.timestamps[lastNonnullIndex]);
-      final DataValue value =
+      final long utcTimestamp = 
timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
+      final DataValue dataValue =
           new DataValue(
-              new Variant(getTabletObjectValue4Opc(tablet.values[i], 
lastNonnullIndex, type)),
-              StatusCode.GOOD,
+              new Variant(values.get(i)),
+              currentQuality,
               new DateTime(utcTimestamp),
               new DateTime());
+
       if (!getNodeManager().containsNode(nodeId)) {
         measurementNode =
             new UaVariableNode.UaVariableNodeBuilder(getNodeContext())
-                .setNodeId(newNodeId(currentFolder + name))
+                .setNodeId(nodeId)
                 .setAccessLevel(AccessLevel.READ_WRITE)
                 .setUserAccessLevel(AccessLevel.READ_ONLY)
-                .setBrowseName(newQualifiedName(name))
-                .setDisplayName(LocalizedText.english(name))
+                .setBrowseName(newQualifiedName(nodeName))
+                .setDisplayName(LocalizedText.english(nodeName))
                 .setDataType(convertToOpcDataType(type))
                 .setTypeDefinition(Identifiers.BaseDataVariableType)
-                .setValue(value)
+                .setValue(dataValue)
                 .build();
         getNodeManager().addNode(measurementNode);
         if (Objects.nonNull(folderNode)) {
@@ -324,17 +272,11 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
                                 String.format("The Node %s does not exist.", 
nodeId)));
       }
 
-      final long utcTimestamp = 
timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
       if (Objects.isNull(sink.getValueName())) {
         if (Objects.isNull(measurementNode.getValue())
-            || 
Objects.requireNonNull(measurementNode.getValue().getSourceTime()).getUtcTime()
-                < utcTimestamp) {
-          measurementNode.setValue(
-              new DataValue(
-                  new Variant(values.get(i)),
-                  currentQuality,
-                  new DateTime(utcTimestamp),
-                  new DateTime()));
+            || Objects.isNull(measurementNode.getValue().getSourceTime())
+            || measurementNode.getValue().getSourceTime().getUtcTime() < 
utcTimestamp) {
+          measurementNode.setValue(dataValue);
         }
       } else {
         valueNode = measurementNode;
@@ -342,6 +284,15 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
         timestamp = utcTimestamp;
       }
     }
+    if (Objects.nonNull(valueNode)) {
+      if (Objects.isNull(valueNode.getValue())
+          || Objects.isNull(valueNode.getValue().getSourceTime())
+          || valueNode.getValue().getSourceTime().getUtcTime() < timestamp) {
+        valueNode.setValue(
+            new DataValue(
+                new Variant(value), currentQuality, new DateTime(timestamp), 
new DateTime()));
+      }
+    }
   }
 
   private static Object getTabletObjectValue4Opc(
@@ -388,23 +339,8 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
             .createEvent(
                 new NodeId(getNamespaceIndex(), UUID.randomUUID()), 
Identifiers.BaseEventType);
 
-    List<String> sourceNameList = null;
-    if (isTableModel) {
-      sourceNameList = new ArrayList<>(tablet.getRowSize());
-      for (int i = 0; i < tablet.getRowSize(); ++i) {
-        final StringBuilder idBuilder = new 
StringBuilder(sink.getDatabaseName());
-        for (final Object segment : tablet.getDeviceID(i).getSegments()) {
-          idBuilder
-              .append(TsFileConstant.PATH_SEPARATOR)
-              .append(Objects.isNull(segment) ? sink.getPlaceHolder4NullTag() 
: segment);
-        }
-        sourceNameList.add(idBuilder.toString());
-      }
-    }
-
     // Use eventNode here because other nodes doesn't support values and times 
simultaneously
     for (int columnIndex = 0; columnIndex < tablet.getSchemas().size(); 
++columnIndex) {
-
       final TSDataType dataType = 
tablet.getSchemas().get(columnIndex).getType();
 
       // Source name --> Sensor path, like root.test.d_0.s_0
@@ -440,8 +376,8 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
           case DATE:
             eventNode.setMessage(
                 LocalizedText.english(
-                    (((LocalDate[]) tablet.values[columnIndex])[rowIndex])
-                        .atStartOfDay(ZoneId.systemDefault())
+                    ((LocalDate[]) tablet.values[columnIndex])
+                        [rowIndex].atStartOfDay(ZoneId.systemDefault())
                         .toString()));
             break;
           case INT64:
@@ -506,7 +442,6 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
       case STRING:
         return Identifiers.String;
       case VECTOR:
-      case OBJECT:
       case UNKNOWN:
       default:
         throw new PipeRuntimeNonCriticalException("Unsupported data type: " + 
type);

Reply via email to