This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/fix_procedure_bug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a30ee171f56848235998418711d301fc2a6fcaf9
Author: Beyyes <[email protected]>
AuthorDate: Sun Sep 4 17:03:06 2022 +0800

    fix serialize bug in create regions group procedure
---
 .../request/write/CreateRegionGroupsPlan.java      |  14 ++-
 .../impl/CreateRegionGroupsProcedure.java          |  34 +++++--
 .../procedure/state/RemoveConfigNodeState.java     |   3 +-
 .../impl/CreateRegionGroupsProcedureTest.java      | 109 +++++++++++++++++++++
 4 files changed, 148 insertions(+), 12 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionGroupsPlan.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionGroupsPlan.java
index c9bb8fdedc..57bd998a25 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionGroupsPlan.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionGroupsPlan.java
@@ -60,8 +60,18 @@ public class CreateRegionGroupsPlan extends 
ConfigPhysicalPlan {
         .add(regionReplicaSet);
   }
 
+  public void serializeForProcedure(DataOutputStream stream) throws 
IOException {
+    this.serializeImpl(stream);
+  }
+
+  public void deserializeForProcedure(ByteBuffer buffer) throws IOException {
+    // to remove the ordinal of ConfigPhysicalPlanType
+    buffer.getInt();
+    this.deserializeImpl(buffer);
+  }
+
   @Override
-  public void serializeImpl(DataOutputStream stream) throws IOException {
+  protected void serializeImpl(DataOutputStream stream) throws IOException {
     stream.writeInt(ConfigPhysicalPlanType.CreateRegionGroups.ordinal());
 
     stream.writeInt(regionGroupMap.size());
@@ -77,7 +87,7 @@ public class CreateRegionGroupsPlan extends 
ConfigPhysicalPlan {
   }
 
   @Override
-  public void deserializeImpl(ByteBuffer buffer) throws IOException {
+  protected void deserializeImpl(ByteBuffer buffer) throws IOException {
     int storageGroupNum = buffer.getInt();
     for (int i = 0; i < storageGroupNum; i++) {
       String storageGroup = BasicStructureSerDeUtil.readString(buffer);
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java
index a9ead15463..d564bebce6 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.procedure.impl;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import 
org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.DeleteRegionGroupsPlan;
 import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
@@ -44,10 +45,7 @@ public class CreateRegionGroupsProcedure
 
   private CreateRegionGroupsPlan createRegionGroupsPlan = new 
CreateRegionGroupsPlan();
 
-  /**
-   * key: TConsensusGroupId
-   * value: Failed RegionReplicas
-   */
+  /** key: TConsensusGroupId value: Failed RegionReplicas */
   private Map<TConsensusGroupId, TRegionReplicaSet> failedRegions = new 
HashMap<>();
 
   public CreateRegionGroupsProcedure() {
@@ -58,6 +56,13 @@ public class CreateRegionGroupsProcedure
     this.createRegionGroupsPlan = createRegionGroupsPlan;
   }
 
+  public CreateRegionGroupsProcedure(
+      CreateRegionGroupsPlan createRegionGroupsPlan,
+      Map<TConsensusGroupId, TRegionReplicaSet> failedRegions) {
+    this.createRegionGroupsPlan = createRegionGroupsPlan;
+    this.failedRegions = failedRegions;
+  }
+
   @Override
   protected Flow executeFromState(ConfigNodeProcedureEnv env, 
CreateRegionGroupsState state) {
     switch (state) {
@@ -156,16 +161,29 @@ public class CreateRegionGroupsProcedure
     // must serialize CREATE_REGION_GROUPS.ordinal() firstly
     
stream.writeInt(ProcedureFactory.ProcedureType.CREATE_REGION_GROUPS.ordinal());
     super.serialize(stream);
-    createRegionGroupsPlan.serializeImpl(stream);
-
-
+    createRegionGroupsPlan.serializeForProcedure(stream);
+    stream.writeInt(failedRegions.size());
+    failedRegions.forEach(
+        (groupId, replica) -> {
+          ThriftCommonsSerDeUtils.serializeTConsensusGroupId(groupId, stream);
+          ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(replica, stream);
+        });
   }
 
   @Override
   public void deserialize(ByteBuffer byteBuffer) {
     super.deserialize(byteBuffer);
     try {
-      createRegionGroupsPlan.deserializeImpl(byteBuffer);
+      createRegionGroupsPlan.deserializeForProcedure(byteBuffer);
+      failedRegions.clear();
+      int failedRegionsSize = byteBuffer.getInt();
+      while (failedRegionsSize-- > 0) {
+        TConsensusGroupId groupId =
+            ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer);
+        TRegionReplicaSet replica =
+            ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(byteBuffer);
+        failedRegions.put(groupId, replica);
+      }
     } catch (Exception e) {
       LOGGER.error("Deserialize meets error in CreateRegionGroupsProcedure", 
e);
       throw new RuntimeException(e);
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveConfigNodeState.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveConfigNodeState.java
index 9c94ac1fde..dc7acbda82 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveConfigNodeState.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveConfigNodeState.java
@@ -20,8 +20,7 @@
 package org.apache.iotdb.confignode.procedure.state;
 
 public enum RemoveConfigNodeState {
-  REMOVE_CONSENSUS_GROUP,
   REMOVE_PEER,
-
+  REMOVE_CONSENSUS_GROUP,
   STOP_CONFIG_NODE
 }
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java
new file mode 100644
index 0000000000..97fd408b47
--- /dev/null
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import 
org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.iotdb.common.rpc.thrift.TConsensusGroupType.DataRegion;
+import static 
org.apache.iotdb.common.rpc.thrift.TConsensusGroupType.SchemaRegion;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class CreateRegionGroupsProcedureTest {
+
+  @Test
+  public void serializeDeserializeTest() {
+    TDataNodeLocation dataNodeLocation0 = new TDataNodeLocation();
+    dataNodeLocation0.setDataNodeId(5);
+    dataNodeLocation0.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667));
+    dataNodeLocation0.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
+    dataNodeLocation0.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 
8777));
+    dataNodeLocation0.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 
40010));
+    dataNodeLocation0.setSchemaRegionConsensusEndPoint(new 
TEndPoint("0.0.0.0", 50010));
+
+    TDataNodeLocation dataNodeLocation1 = new TDataNodeLocation();
+    dataNodeLocation1.setDataNodeId(6);
+    dataNodeLocation1.setClientRpcEndPoint(new TEndPoint("0.0.0.1", 6667));
+    dataNodeLocation1.setInternalEndPoint(new TEndPoint("0.0.0.1", 9003));
+    dataNodeLocation1.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.1", 
8777));
+    dataNodeLocation1.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.1", 
40010));
+    dataNodeLocation1.setSchemaRegionConsensusEndPoint(new 
TEndPoint("0.0.0.1", 50010));
+
+    TConsensusGroupId schemaRegionGroupId = new 
TConsensusGroupId(SchemaRegion, 1);
+    TConsensusGroupId dataRegionGroupId = new TConsensusGroupId(DataRegion, 0);
+
+    TRegionReplicaSet schemaRegionSet =
+        new TRegionReplicaSet(schemaRegionGroupId, 
Collections.singletonList(dataNodeLocation0));
+    TRegionReplicaSet dataRegionSet =
+        new TRegionReplicaSet(dataRegionGroupId, 
Collections.singletonList(dataNodeLocation1));
+
+    // to test the equals method of Map<TConsensusGroupId, TRegionReplicaSet>
+    Map<TConsensusGroupId, TRegionReplicaSet> failedRegions0 =
+        new HashMap<TConsensusGroupId, TRegionReplicaSet>() {
+          {
+            put(dataRegionGroupId, dataRegionSet);
+            put(schemaRegionGroupId, schemaRegionSet);
+          }
+        };
+    Map<TConsensusGroupId, TRegionReplicaSet> failedRegions1 =
+        new HashMap<TConsensusGroupId, TRegionReplicaSet>() {
+          {
+            put(schemaRegionGroupId, schemaRegionSet);
+            put(dataRegionGroupId, dataRegionSet);
+          }
+        };
+    assertEquals(failedRegions0, failedRegions1);
+
+    CreateRegionGroupsPlan createRegionGroupsPlan = new 
CreateRegionGroupsPlan();
+    createRegionGroupsPlan.addRegionGroup("root.sg0", dataRegionSet);
+    createRegionGroupsPlan.addRegionGroup("root.sg1", schemaRegionSet);
+
+    CreateRegionGroupsProcedure procedure0 =
+        new CreateRegionGroupsProcedure(createRegionGroupsPlan, 
failedRegions0);
+    PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
+
+    try {
+      procedure0.serialize(outputStream);
+      CreateRegionGroupsProcedure procedure1 = new 
CreateRegionGroupsProcedure();
+      ByteBuffer buffer =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+      buffer.getInt();
+      procedure1.deserialize(buffer);
+      assertEquals(procedure0, procedure1);
+    } catch (IOException e) {
+      fail();
+    }
+  }
+}

Reply via email to