This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3b02d322735 Fix underflow exception caused by serialize function of
DataPartitionTableIntegrityCheckProcedure (#17369)
3b02d322735 is described below
commit 3b02d322735eaca634b9edc8ba57242b5f88158b
Author: libo <[email protected]>
AuthorDate: Fri Mar 27 09:29:35 2026 +0800
Fix underflow exception caused by serialize function of
DataPartitionTableIntegrityCheckProcedure (#17369)
---
.../DataPartitionTableIntegrityCheckProcedure.java | 62 ++++++++-
...aPartitionTableIntegrityCheckProcedureTest.java | 149 +++++++++++++++++++++
2 files changed, 209 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
index 5f67355b0cb..a0e17794e7a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
@@ -38,6 +38,7 @@ import
org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
import
org.apache.iotdb.confignode.procedure.state.DataPartitionTableIntegrityCheckProcedureState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import
org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq;
@@ -116,9 +117,9 @@ public class DataPartitionTableIntegrityCheckProcedure
*/
private Map<String, DataPartitionTable> finalDataPartitionTables;
- private static Set<TDataNodeConfiguration> skipDataNodes =
+ private Set<TDataNodeConfiguration> skipDataNodes =
Collections.newSetFromMap(new ConcurrentHashMap<>());
- private static Set<TDataNodeConfiguration> failedDataNodes =
+ private Set<TDataNodeConfiguration> failedDataNodes =
Collections.newSetFromMap(new ConcurrentHashMap<>());
// ============Need serialize END=============/
@@ -181,6 +182,12 @@ public class DataPartitionTableIntegrityCheckProcedure
case MERGE_PARTITION_TABLES:
finalDataPartitionTables.clear();
break;
+ case WRITE_PARTITION_TABLE_TO_CONSENSUS:
+ allDataNodes.clear();
+ earliestTimeslots.clear();
+ dataPartitionTables.clear();
+ finalDataPartitionTables.clear();
+ break;
default:
allDataNodes.clear();
earliestTimeslots.clear();
@@ -703,6 +710,7 @@ public class DataPartitionTableIntegrityCheckProcedure
@Override
public void serialize(final DataOutputStream stream) throws IOException {
+
stream.writeShort(ProcedureType.DATA_PARTITION_TABLE_INTEGRITY_CHECK_PROCEDURE.getTypeCode());
super.serialize(stream);
// Serialize earliestTimeslots
@@ -969,4 +977,54 @@ public class DataPartitionTableIntegrityCheckProcedure
return result;
}
+
+ public Map<String, Long> getEarliestTimeslots() {
+ return earliestTimeslots;
+ }
+
+ public Map<Integer, List<DatabaseScopedDataPartitionTable>>
getDataPartitionTables() {
+ return dataPartitionTables;
+ }
+
+ public Set<String> getDatabasesWithLostDataPartition() {
+ return databasesWithLostDataPartition;
+ }
+
+ public Map<String, DataPartitionTable> getFinalDataPartitionTables() {
+ return finalDataPartitionTables;
+ }
+
+ public Set<TDataNodeConfiguration> getSkipDataNodes() {
+ return skipDataNodes;
+ }
+
+ public Set<TDataNodeConfiguration> getFailedDataNodes() {
+ return failedDataNodes;
+ }
+
+ public void setEarliestTimeslots(Map<String, Long> earliestTimeslots) {
+ this.earliestTimeslots = earliestTimeslots;
+ }
+
+ public void setDataPartitionTables(
+ Map<Integer, List<DatabaseScopedDataPartitionTable>>
dataPartitionTables) {
+ this.dataPartitionTables = dataPartitionTables;
+ }
+
+ public void setDatabasesWithLostDataPartition(Set<String>
databasesWithLostDataPartition) {
+ this.databasesWithLostDataPartition = databasesWithLostDataPartition;
+ }
+
+ public void setFinalDataPartitionTables(
+ Map<String, DataPartitionTable> finalDataPartitionTables) {
+ this.finalDataPartitionTables = finalDataPartitionTables;
+ }
+
+ public void setSkipDataNodes(Set<TDataNodeConfiguration> skipDataNodes) {
+ this.skipDataNodes = skipDataNodes;
+ }
+
+ public void setFailedDataNodes(Set<TDataNodeConfiguration> failedDataNodes) {
+ this.failedDataNodes = failedDataNodes;
+ }
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedureTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedureTest.java
new file mode 100644
index 00000000000..a0cc76fe5c9
--- /dev/null
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedureTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TNodeResource;
+import org.apache.iotdb.commons.partition.DataPartitionTable;
+import org.apache.iotdb.commons.partition.DatabaseScopedDataPartitionTable;
+import org.apache.iotdb.confignode.procedure.Procedure;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+
+import org.apache.tsfile.utils.PublicBAOS;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class DataPartitionTableIntegrityCheckProcedureTest {
+ @Test
+ public void serDeTest() throws IOException {
+ DataPartitionTableIntegrityCheckProcedure original =
createTestProcedureWithData();
+
+ try (PublicBAOS baos = new PublicBAOS();
+ DataOutputStream dos = new DataOutputStream(baos)) {
+
+ original.serialize(dos);
+
+ System.out.println("Serialized bytes length: " + baos.size());
+
+ ByteBuffer buffer = ByteBuffer.wrap(baos.getBuf(), 0, baos.size());
+
+ Procedure<?> recreated = ProcedureFactory.getInstance().create(buffer);
+
+ if (recreated instanceof DataPartitionTableIntegrityCheckProcedure) {
+ DataPartitionTableIntegrityCheckProcedure actual =
+ (DataPartitionTableIntegrityCheckProcedure) recreated;
+ assertProcedureEquals(original, actual);
+ System.out.println("All checked fields match!");
+ } else {
+ Assert.fail("Recreated is not
DataPartitionTableIntegrityCheckProcedure");
+ }
+ }
+ }
+
+ private DataPartitionTableIntegrityCheckProcedure
createTestProcedureWithData() {
+ DataPartitionTableIntegrityCheckProcedure proc =
+ new DataPartitionTableIntegrityCheckProcedure();
+ String database = "root.test";
+
+ Map<String, Long> earliestTimeslots = new HashMap<>();
+ earliestTimeslots.put(database, 0L);
+ proc.setEarliestTimeslots(earliestTimeslots);
+
+ Map<Integer, List<DatabaseScopedDataPartitionTable>> dataPartitionTables =
new HashMap<>();
+ DataPartitionTable dataPartitionTable = new DataPartitionTable();
+ dataPartitionTables.put(
+ 1,
+ Collections.singletonList(
+ new DatabaseScopedDataPartitionTable(database,
dataPartitionTable)));
+ proc.setDataPartitionTables(dataPartitionTables);
+
+ Set<String> databasesWithLostDataPartition = new HashSet<>();
+ databasesWithLostDataPartition.add(database);
+ proc.setDatabasesWithLostDataPartition(databasesWithLostDataPartition);
+
+ Map<String, DataPartitionTable> finalDataPartitionTables = new HashMap<>();
+ finalDataPartitionTables.put(database, dataPartitionTable);
+ proc.setFinalDataPartitionTables(finalDataPartitionTables);
+
+ Set<TDataNodeConfiguration> skipNodes = getTDataNodeConfigurations(1);
+ proc.setSkipDataNodes(skipNodes);
+
+ Set<TDataNodeConfiguration> failedNodes = getTDataNodeConfigurations(2);
+ proc.setFailedDataNodes(failedNodes);
+
+ return proc;
+ }
+
+ private static Set<TDataNodeConfiguration> getTDataNodeConfigurations(int
dataNodeId) {
+ Set<TDataNodeConfiguration> nodes = new HashSet<>();
+ TDataNodeLocation tDataNodeConfiguration =
+ new TDataNodeLocation(
+ dataNodeId,
+ new TEndPoint("127.0.0.1", 5),
+ new TEndPoint("127.0.0.1", 6),
+ new TEndPoint("127.0.0.1", 7),
+ new TEndPoint("127.0.0.1", 8),
+ new TEndPoint("127.0.0.1", 9));
+ TNodeResource resource = new TNodeResource(16, 34359738368L);
+ TDataNodeConfiguration skipDataNodeConfiguration =
+ new TDataNodeConfiguration(tDataNodeConfiguration, resource);
+ nodes.add(skipDataNodeConfiguration);
+ return nodes;
+ }
+
+ private void assertProcedureEquals(
+ DataPartitionTableIntegrityCheckProcedure expected,
+ DataPartitionTableIntegrityCheckProcedure actual) {
+ Assert.assertEquals("procId mismatch", expected.getProcId(),
actual.getProcId());
+ Assert.assertEquals("state mismatch", expected.getState(),
actual.getState());
+ Assert.assertEquals(
+ "earliestTimeslots mismatch",
+ expected.getEarliestTimeslots(),
+ actual.getEarliestTimeslots());
+ Assert.assertEquals(
+ "dataPartitionTables mismatch",
+ expected.getDataPartitionTables(),
+ actual.getDataPartitionTables());
+ Assert.assertEquals(
+ "databasesWithLostDataPartition mismatch",
+ expected.getDatabasesWithLostDataPartition(),
+ actual.getDatabasesWithLostDataPartition());
+ Assert.assertEquals(
+ "finalDataPartitionTables mismatch",
+ expected.getFinalDataPartitionTables(),
+ actual.getFinalDataPartitionTables());
+ Assert.assertEquals(
+ "skipDataNodes mismatch", expected.getSkipDataNodes(),
actual.getSkipDataNodes());
+ Assert.assertEquals(
+ "failedDataNodes mismatch", expected.getFailedDataNodes(),
actual.getFailedDataNodes());
+ }
+}