This is an automated email from the ASF dual-hosted git repository.
Caideyipi pushed a commit to branch subscription-support
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/subscription-support by this
push:
new 3f35a50ea2a fix
3f35a50ea2a is described below
commit 3f35a50ea2adec12d3573fd10b33d34028a3066f
Author: Caideyipi <[email protected]>
AuthorDate: Tue Apr 21 17:33:14 2026 +0800
fix
---
.../protocol/opcua/client/IoTDBOpcUaClient.java | 65 +++++++++++++++++-----
.../sink/protocol/opcua/server/OpcUaNameSpace.java | 12 ++--
2 files changed, 56 insertions(+), 21 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
index 977d672df73..a9aa95cacdf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
@@ -56,9 +57,10 @@ import
org.eclipse.milo.opcua.stack.core.types.structured.VariableAttributes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.nio.file.Paths;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
@@ -130,7 +132,6 @@ public class IoTDBOpcUaClient {
StatusCode currentQuality = sink.getDefaultQuality();
Object value = null;
long timestamp = 0;
- NodeId nodeId = null;
NodeId opcDataType = null;
for (int i = 0; i < measurementSchemas.size(); ++i) {
@@ -153,17 +154,43 @@ public class IoTDBOpcUaClient {
"When the 'with-quality' mode is enabled, the measurement must be
either \"value-name\" or \"quality-name\"");
continue;
}
- nodeId = new NodeId(NAME_SPACE_INDEX, String.join("/", segments));
final long utcTimestamp =
timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
- value = values.get(i);
- timestamp = utcTimestamp;
- opcDataType = convertToOpcDataType(type);
+ if (Objects.isNull(sink.getValueName())) {
+ writeValue(
+ values.get(i),
+ utcTimestamp,
+ convertToOpcDataType(type),
+ currentQuality,
+ segments,
+ name);
+ } else {
+ value = values.get(i);
+ timestamp = utcTimestamp;
+ opcDataType = convertToOpcDataType(type);
+ }
}
if (Objects.isNull(value)) {
return;
}
+ writeValue(value, timestamp, opcDataType, currentQuality, segments, null);
+ }
+
+ private void writeValue(
+ final Object value,
+ final long timestamp,
+ final NodeId opcDataType,
+ final StatusCode currentQuality,
+ final String[] segments,
+ final @Nullable String name)
+ throws Exception {
+ final NodeId nodeId =
+ new NodeId(
+ NAME_SPACE_INDEX,
+ Objects.nonNull(name)
+ ? String.join("/", segments) + "/" + name
+ : String.join("/", segments));
final Variant variant = new Variant(value);
final DataValue dataValue =
new DataValue(variant, currentQuality, new DateTime(timestamp), new
DateTime());
@@ -171,36 +198,41 @@ public class IoTDBOpcUaClient {
if (writeStatus.getValue() == StatusCodes.Bad_NodeIdUnknown) {
final AddNodesResponse addStatus =
- client.addNodes(getNodesToAdd(segments, opcDataType, variant)).get();
+ client.addNodes(getNodesToAdd(segments, name, opcDataType,
variant)).get();
for (final AddNodesResult result : addStatus.getResults()) {
if (!result.getStatusCode().equals(StatusCode.GOOD)
&& !(result.getStatusCode().getValue() ==
StatusCodes.Bad_NodeIdExists)) {
throw new PipeException(
"Failed to create nodes after transfer data value, creation
status: "
+ addStatus
- + getErrorString(segments, opcDataType, value, writeStatus));
+ + getErrorString(segments, name, opcDataType, value,
writeStatus));
}
}
writeStatus = client.writeValue(nodeId, dataValue).get();
if (writeStatus.getValue() != StatusCode.GOOD.getValue()) {
throw new PipeException(
"Failed to transfer dataValue after successfully created nodes"
- + getErrorString(segments, opcDataType, value, writeStatus));
+ + getErrorString(segments, name, opcDataType, value,
writeStatus));
}
} else if (writeStatus.getValue() != StatusCode.GOOD.getValue()) {
throw new PipeException(
"Failed to transfer dataValue"
- + getErrorString(segments, opcDataType, value, writeStatus));
+ + getErrorString(segments, name, opcDataType, value,
writeStatus));
}
}
private static String getErrorString(
final String[] segments,
+ final @Nullable String name,
final NodeId dataType,
final Object value,
final StatusCode writeStatus) {
- return ", segments: "
- + Arrays.toString(segments)
+ return ", measurement: "
+ + (Objects.nonNull(name)
+ ? String.join(TsFileConstant.PATH_SEPARATOR, segments)
+ + TsFileConstant.PATH_SEPARATOR
+ + name
+ : String.join(TsFileConstant.PATH_SEPARATOR, segments))
+ ", dataType: "
+ dataType
+ ", value: "
@@ -210,7 +242,10 @@ public class IoTDBOpcUaClient {
}
public List<AddNodesItem> getNodesToAdd(
- final String[] segments, final NodeId opcDataType, final Variant
initialValue) {
+ final String[] segments,
+ final @Nullable String name,
+ final NodeId opcDataType,
+ final Variant initialValue) {
final List<AddNodesItem> addNodesItems = new ArrayList<>();
final StringBuilder sb = new StringBuilder(segments[0]);
ExpandedNodeId curNodeId = new NodeId(NAME_SPACE_INDEX,
segments[0]).expanded();
@@ -226,7 +261,7 @@ public class IoTDBOpcUaClient {
Identifiers.FolderType.expanded()));
// segments.length >= 3
- for (int i = 1; i < segments.length - 1; ++i) {
+ for (int i = 1; i < (Objects.nonNull(name) ? segments.length :
segments.length - 1); ++i) {
sb.append("/").append(segments[i]);
final ExpandedNodeId nextId = new NodeId(NAME_SPACE_INDEX,
sb.toString()).expanded();
addNodesItems.add(
@@ -242,7 +277,7 @@ public class IoTDBOpcUaClient {
curNodeId = nextId;
}
- final String measurementName = segments[segments.length - 1];
+ final String measurementName = Objects.nonNull(name) ? name :
segments[segments.length - 1];
sb.append("/").append(measurementName);
addNodesItems.add(
new AddNodesItem(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index d1182e59b52..ea3c0539fd5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -294,14 +294,14 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
}
final UaVariableNode measurementNode;
final long utcTimestamp =
timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
- final DataValue dataValue =
- new DataValue(
- new Variant(values.get(i)),
- currentQuality,
- new DateTime(utcTimestamp),
- new DateTime());
if (Objects.isNull(sink.getValueName())) {
+ final DataValue dataValue =
+ new DataValue(
+ new Variant(values.get(i)),
+ currentQuality,
+ new DateTime(utcTimestamp),
+ new DateTime());
measurementNode = addNode(name, currentFolder, folderNode, dataValue,
type);
if (Objects.isNull(measurementNode.getValue())
|| Objects.isNull(measurementNode.getValue().getSourceTime())