This is an automated email from the ASF dual-hosted git repository.
Caideyipi pushed a commit to branch none-impl
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/none-impl by this push:
new 29e49a378ae attribute sync
29e49a378ae is described below
commit 29e49a378ae2351739b7716be9ff0500961875ee
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 22 17:25:29 2026 +0800
attribute sync
---
.../org/apache/iotdb/db/it/utils/TestUtils.java | 7 ++---
.../manual/enhanced/IoTDBPipeMetaIT.java | 34 ++++++++++++++++++++--
.../node/schema/CreateOrUpdateTableDeviceNode.java | 3 +-
.../attribute/DeviceAttributeStore.java | 7 +++++
.../db/tools/schema/SRStatementGenerator.java | 9 +++++-
5 files changed, 52 insertions(+), 8 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
index 9c2a09024c4..93f14dc572b 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
@@ -1055,16 +1055,15 @@ public class TestUtils {
null);
}
- public static void executeNonQueries(
- BaseEnv env, List<String> sqlList, Connection defaultConnection) {
+ public static void executeNonQueries(BaseEnv env, List<String> sqlList,
String sqlDialect) {
executeNonQueries(
env,
sqlList,
SessionConfig.DEFAULT_USER,
SessionConfig.DEFAULT_PASSWORD,
null,
- TREE_SQL_DIALECT,
- defaultConnection);
+ sqlDialect,
+ null);
}
public static void executeNonQueries(
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeMetaIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeMetaIT.java
index 936ca1f5097..f26aa171ae1 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeMetaIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeMetaIT.java
@@ -232,8 +232,7 @@ public class IoTDBPipeMetaIT extends
AbstractPipeTableModelDualManualIT {
"create database root.test",
"alter database root.test with schema_region_group_num=2,
data_region_group_num=3",
"create timeSeries root.test.d1.s1 int32",
- "insert into root.test.d1 (s1) values (1)"),
- null);
+ "insert into root.test.d1 (s1) values (1)"));
TestUtils.assertDataAlwaysOnEnv(
receiverEnv,
@@ -418,4 +417,35 @@ public class IoTDBPipeMetaIT extends
AbstractPipeTableModelDualManualIT {
}
}
}
+
+ @Test
+ public void testAttributeSync() {
+ TestUtils.executeNonQueries(
+ receiverEnv,
+ Arrays.asList(
+ "create database test",
+ "use test",
+ "create table table1(a tag, b attribute, c attribute, d int32)",
+ "insert into table1 (time, a, b, c, d) values(1, 1, null, 1, 1),
(2, 2, 2, null, 2)"),
+ BaseEnv.TABLE_SQL_DIALECT);
+
+ TestUtils.executeNonQueries(
+ senderEnv,
+ Arrays.asList(
+ "create database test",
+ "use test",
+ "create table table1(a tag, b attribute, c attribute, d int32)",
+ "insert into table1 (time, a, b, c, d) values(1, 1, 1, null, 1),
(2, 2, null, 2, 2)",
+ String.format(
+ "create pipe a2b with source ('inclusion'='schema') with sink
('node-urls'='%s')",
+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())),
+ BaseEnv.TABLE_SQL_DIALECT);
+
+ TestUtils.assertDataAlwaysOnEnv(
+ receiverEnv,
+ "show devices from table1",
+ "a,b,c,",
+ new HashSet<>(Arrays.asList("1,1,1,", "2,2,2,")),
+ "test");
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/schema/CreateOrUpdateTableDeviceNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/schema/CreateOrUpdateTableDeviceNode.java
index be4b1337e7e..1ee49df2bdf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/schema/CreateOrUpdateTableDeviceNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/schema/CreateOrUpdateTableDeviceNode.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.db.schemaengine.schemaregion.SchemaRegionPlanType;
import org.apache.iotdb.db.schemaengine.schemaregion.SchemaRegionPlanVisitor;
import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.Constants;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
@@ -215,7 +216,7 @@ public class CreateOrUpdateTableDeviceNode extends
WritePlanNode implements ISch
ReadWriteIOUtils.writeObject(value, stream);
}
for (int i = 0; i < attributeNameList.size() -
deviceAttributeValueList.length; ++i) {
- ReadWriteIOUtils.writeObject(null, stream);
+ ReadWriteIOUtils.writeObject(Constants.NONE, stream);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/DeviceAttributeStore.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/DeviceAttributeStore.java
index ee0b5f98c8a..4349a7a49f1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/DeviceAttributeStore.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/DeviceAttributeStore.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.db.schemaengine.rescon.MemSchemaRegionStatistics;
import
org.apache.iotdb.db.schemaengine.schemaregion.attribute.update.UpdateDetailContainer;
import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Constants;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
@@ -163,6 +164,12 @@ public class DeviceAttributeStore implements
IDeviceAttributeStore {
final Map<String, Binary> attributeMap = deviceAttributeList.get(pointer);
for (int i = 0; i < nameList.size(); i++) {
final String key = nameList.get(i);
+ if (valueList.length <= i) {
+ break;
+ }
+ if (valueList[i] == Constants.NONE) {
+ continue;
+ }
final Binary value = (Binary) valueList[i];
originMemUsage =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SRStatementGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SRStatementGenerator.java
index 239b55b521b..aa478491f19 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SRStatementGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SRStatementGenerator.java
@@ -45,6 +45,7 @@ import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.info.T
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.snapshot.MemMTreeSnapshotUtil;
import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Constants;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
@@ -447,7 +448,13 @@ public class SRStatementGenerator implements
Iterator<Object>, Iterable<Object>
attributeNameList = new ArrayList<>(tableAttributes.keySet());
}
final List<Object> attributeValues =
-
attributeNameList.stream().map(tableAttributes::remove).collect(Collectors.toList());
+ attributeNameList.stream()
+ .map(
+ attributeKey -> {
+ final Object attributeValue =
tableAttributes.remove(attributeKey);
+ return Objects.nonNull(attributeValue) ? attributeValue
: Constants.NONE;
+ })
+ .collect(Collectors.toList());
tableAttributes.forEach(
(attributeKey, attributeValue) -> {
attributeNameList.add(attributeKey);