This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b02d322735 Fix underflow exception caused by serialize function of 
DataPartitionTableIntegrityCheckProcedure (#17369)
3b02d322735 is described below

commit 3b02d322735eaca634b9edc8ba57242b5f88158b
Author: libo <[email protected]>
AuthorDate: Fri Mar 27 09:29:35 2026 +0800

    Fix underflow exception caused by serialize function of 
DataPartitionTableIntegrityCheckProcedure (#17369)
---
 .../DataPartitionTableIntegrityCheckProcedure.java |  62 ++++++++-
 ...aPartitionTableIntegrityCheckProcedureTest.java | 149 +++++++++++++++++++++
 2 files changed, 209 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
index 5f67355b0cb..a0e17794e7a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
@@ -38,6 +38,7 @@ import 
org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
 import 
org.apache.iotdb.confignode.procedure.state.DataPartitionTableIntegrityCheckProcedureState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
 import 
org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq;
@@ -116,9 +117,9 @@ public class DataPartitionTableIntegrityCheckProcedure
    */
   private Map<String, DataPartitionTable> finalDataPartitionTables;
 
-  private static Set<TDataNodeConfiguration> skipDataNodes =
+  private Set<TDataNodeConfiguration> skipDataNodes =
       Collections.newSetFromMap(new ConcurrentHashMap<>());
-  private static Set<TDataNodeConfiguration> failedDataNodes =
+  private Set<TDataNodeConfiguration> failedDataNodes =
       Collections.newSetFromMap(new ConcurrentHashMap<>());
 
   // ============Need serialize END=============/
@@ -181,6 +182,12 @@ public class DataPartitionTableIntegrityCheckProcedure
       case MERGE_PARTITION_TABLES:
         finalDataPartitionTables.clear();
         break;
+      case WRITE_PARTITION_TABLE_TO_CONSENSUS:
+        allDataNodes.clear();
+        earliestTimeslots.clear();
+        dataPartitionTables.clear();
+        finalDataPartitionTables.clear();
+        break;
       default:
         allDataNodes.clear();
         earliestTimeslots.clear();
@@ -703,6 +710,7 @@ public class DataPartitionTableIntegrityCheckProcedure
 
   @Override
   public void serialize(final DataOutputStream stream) throws IOException {
+    
stream.writeShort(ProcedureType.DATA_PARTITION_TABLE_INTEGRITY_CHECK_PROCEDURE.getTypeCode());
     super.serialize(stream);
 
     // Serialize earliestTimeslots
@@ -969,4 +977,54 @@ public class DataPartitionTableIntegrityCheckProcedure
 
     return result;
   }
+
+  public Map<String, Long> getEarliestTimeslots() {
+    return earliestTimeslots;
+  }
+
+  public Map<Integer, List<DatabaseScopedDataPartitionTable>> 
getDataPartitionTables() {
+    return dataPartitionTables;
+  }
+
+  public Set<String> getDatabasesWithLostDataPartition() {
+    return databasesWithLostDataPartition;
+  }
+
+  public Map<String, DataPartitionTable> getFinalDataPartitionTables() {
+    return finalDataPartitionTables;
+  }
+
+  public Set<TDataNodeConfiguration> getSkipDataNodes() {
+    return skipDataNodes;
+  }
+
+  public Set<TDataNodeConfiguration> getFailedDataNodes() {
+    return failedDataNodes;
+  }
+
+  public void setEarliestTimeslots(Map<String, Long> earliestTimeslots) {
+    this.earliestTimeslots = earliestTimeslots;
+  }
+
+  public void setDataPartitionTables(
+      Map<Integer, List<DatabaseScopedDataPartitionTable>> 
dataPartitionTables) {
+    this.dataPartitionTables = dataPartitionTables;
+  }
+
+  public void setDatabasesWithLostDataPartition(Set<String> 
databasesWithLostDataPartition) {
+    this.databasesWithLostDataPartition = databasesWithLostDataPartition;
+  }
+
+  public void setFinalDataPartitionTables(
+      Map<String, DataPartitionTable> finalDataPartitionTables) {
+    this.finalDataPartitionTables = finalDataPartitionTables;
+  }
+
+  public void setSkipDataNodes(Set<TDataNodeConfiguration> skipDataNodes) {
+    this.skipDataNodes = skipDataNodes;
+  }
+
+  public void setFailedDataNodes(Set<TDataNodeConfiguration> failedDataNodes) {
+    this.failedDataNodes = failedDataNodes;
+  }
 }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedureTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedureTest.java
new file mode 100644
index 00000000000..a0cc76fe5c9
--- /dev/null
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedureTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TNodeResource;
+import org.apache.iotdb.commons.partition.DataPartitionTable;
+import org.apache.iotdb.commons.partition.DatabaseScopedDataPartitionTable;
+import org.apache.iotdb.confignode.procedure.Procedure;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+
+import org.apache.tsfile.utils.PublicBAOS;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class DataPartitionTableIntegrityCheckProcedureTest {
+  @Test
+  public void serDeTest() throws IOException {
+    DataPartitionTableIntegrityCheckProcedure original = 
createTestProcedureWithData();
+
+    try (PublicBAOS baos = new PublicBAOS();
+        DataOutputStream dos = new DataOutputStream(baos)) {
+
+      original.serialize(dos);
+
+      System.out.println("Serialized bytes length: " + baos.size());
+
+      ByteBuffer buffer = ByteBuffer.wrap(baos.getBuf(), 0, baos.size());
+
+      Procedure<?> recreated = ProcedureFactory.getInstance().create(buffer);
+
+      if (recreated instanceof DataPartitionTableIntegrityCheckProcedure) {
+        DataPartitionTableIntegrityCheckProcedure actual =
+            (DataPartitionTableIntegrityCheckProcedure) recreated;
+        assertProcedureEquals(original, actual);
+        System.out.println("All checked fields match!");
+      } else {
+        Assert.fail("Recreated is not 
DataPartitionTableIntegrityCheckProcedure");
+      }
+    }
+  }
+
+  private DataPartitionTableIntegrityCheckProcedure 
createTestProcedureWithData() {
+    DataPartitionTableIntegrityCheckProcedure proc =
+        new DataPartitionTableIntegrityCheckProcedure();
+    String database = "root.test";
+
+    Map<String, Long> earliestTimeslots = new HashMap<>();
+    earliestTimeslots.put(database, 0L);
+    proc.setEarliestTimeslots(earliestTimeslots);
+
+    Map<Integer, List<DatabaseScopedDataPartitionTable>> dataPartitionTables = 
new HashMap<>();
+    DataPartitionTable dataPartitionTable = new DataPartitionTable();
+    dataPartitionTables.put(
+        1,
+        Collections.singletonList(
+            new DatabaseScopedDataPartitionTable(database, 
dataPartitionTable)));
+    proc.setDataPartitionTables(dataPartitionTables);
+
+    Set<String> databasesWithLostDataPartition = new HashSet<>();
+    databasesWithLostDataPartition.add(database);
+    proc.setDatabasesWithLostDataPartition(databasesWithLostDataPartition);
+
+    Map<String, DataPartitionTable> finalDataPartitionTables = new HashMap<>();
+    finalDataPartitionTables.put(database, dataPartitionTable);
+    proc.setFinalDataPartitionTables(finalDataPartitionTables);
+
+    Set<TDataNodeConfiguration> skipNodes = getTDataNodeConfigurations(1);
+    proc.setSkipDataNodes(skipNodes);
+
+    Set<TDataNodeConfiguration> failedNodes = getTDataNodeConfigurations(2);
+    proc.setFailedDataNodes(failedNodes);
+
+    return proc;
+  }
+
+  private static Set<TDataNodeConfiguration> getTDataNodeConfigurations(int 
dataNodeId) {
+    Set<TDataNodeConfiguration> nodes = new HashSet<>();
+    TDataNodeLocation tDataNodeConfiguration =
+        new TDataNodeLocation(
+            dataNodeId,
+            new TEndPoint("127.0.0.1", 5),
+            new TEndPoint("127.0.0.1", 6),
+            new TEndPoint("127.0.0.1", 7),
+            new TEndPoint("127.0.0.1", 8),
+            new TEndPoint("127.0.0.1", 9));
+    TNodeResource resource = new TNodeResource(16, 34359738368L);
+    TDataNodeConfiguration skipDataNodeConfiguration =
+        new TDataNodeConfiguration(tDataNodeConfiguration, resource);
+    nodes.add(skipDataNodeConfiguration);
+    return nodes;
+  }
+
+  private void assertProcedureEquals(
+      DataPartitionTableIntegrityCheckProcedure expected,
+      DataPartitionTableIntegrityCheckProcedure actual) {
+    Assert.assertEquals("procId mismatch", expected.getProcId(), 
actual.getProcId());
+    Assert.assertEquals("state mismatch", expected.getState(), 
actual.getState());
+    Assert.assertEquals(
+        "earliestTimeslots mismatch",
+        expected.getEarliestTimeslots(),
+        actual.getEarliestTimeslots());
+    Assert.assertEquals(
+        "dataPartitionTables mismatch",
+        expected.getDataPartitionTables(),
+        actual.getDataPartitionTables());
+    Assert.assertEquals(
+        "databasesWithLostDataPartition mismatch",
+        expected.getDatabasesWithLostDataPartition(),
+        actual.getDatabasesWithLostDataPartition());
+    Assert.assertEquals(
+        "finalDataPartitionTables mismatch",
+        expected.getFinalDataPartitionTables(),
+        actual.getFinalDataPartitionTables());
+    Assert.assertEquals(
+        "skipDataNodes mismatch", expected.getSkipDataNodes(), 
actual.getSkipDataNodes());
+    Assert.assertEquals(
+        "failedDataNodes mismatch", expected.getFailedDataNodes(), 
actual.getFailedDataNodes());
+  }
+}

Reply via email to