This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/fix_procedure_bug in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a30ee171f56848235998418711d301fc2a6fcaf9 Author: Beyyes <[email protected]> AuthorDate: Sun Sep 4 17:03:06 2022 +0800 fix serialize bug in create regions group procedure --- .../request/write/CreateRegionGroupsPlan.java | 14 ++- .../impl/CreateRegionGroupsProcedure.java | 34 +++++-- .../procedure/state/RemoveConfigNodeState.java | 3 +- .../impl/CreateRegionGroupsProcedureTest.java | 109 +++++++++++++++++++++ 4 files changed, 148 insertions(+), 12 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionGroupsPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionGroupsPlan.java index c9bb8fdedc..57bd998a25 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionGroupsPlan.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionGroupsPlan.java @@ -60,8 +60,18 @@ public class CreateRegionGroupsPlan extends ConfigPhysicalPlan { .add(regionReplicaSet); } + public void serializeForProcedure(DataOutputStream stream) throws IOException { + this.serializeImpl(stream); + } + + public void deserializeForProcedure(ByteBuffer buffer) throws IOException { + // to remove the ordinal of ConfigPhysicalPlanType + buffer.getInt(); + this.deserializeImpl(buffer); + } + @Override - public void serializeImpl(DataOutputStream stream) throws IOException { + protected void serializeImpl(DataOutputStream stream) throws IOException { stream.writeInt(ConfigPhysicalPlanType.CreateRegionGroups.ordinal()); stream.writeInt(regionGroupMap.size()); @@ -77,7 +87,7 @@ public class CreateRegionGroupsPlan extends ConfigPhysicalPlan { } @Override - public void deserializeImpl(ByteBuffer buffer) throws IOException { + protected void deserializeImpl(ByteBuffer buffer) throws IOException { int storageGroupNum = buffer.getInt(); for (int i = 0; i < storageGroupNum; i++) { String storageGroup = BasicStructureSerDeUtil.readString(buffer); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java index a9ead15463..d564bebce6 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java @@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.procedure.impl; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan; import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionGroupsPlan; import org.apache.iotdb.confignode.procedure.StateMachineProcedure; @@ -44,10 +45,7 @@ public class CreateRegionGroupsProcedure private CreateRegionGroupsPlan createRegionGroupsPlan = new CreateRegionGroupsPlan(); - /** - * key: TConsensusGroupId - * value: Failed RegionReplicas - */ + /** key: TConsensusGroupId value: Failed RegionReplicas */ private Map<TConsensusGroupId, TRegionReplicaSet> failedRegions = new HashMap<>(); public CreateRegionGroupsProcedure() { @@ -58,6 +56,13 @@ public class CreateRegionGroupsProcedure this.createRegionGroupsPlan = createRegionGroupsPlan; } + public CreateRegionGroupsProcedure( + CreateRegionGroupsPlan createRegionGroupsPlan, + Map<TConsensusGroupId, TRegionReplicaSet> failedRegions) { + this.createRegionGroupsPlan = createRegionGroupsPlan; + this.failedRegions = failedRegions; + } + @Override protected Flow executeFromState(ConfigNodeProcedureEnv env, CreateRegionGroupsState state) { switch (state) { @@ -156,16 +161,29 @@ public class CreateRegionGroupsProcedure // must serialize CREATE_REGION_GROUPS.ordinal() firstly stream.writeInt(ProcedureFactory.ProcedureType.CREATE_REGION_GROUPS.ordinal()); super.serialize(stream); - createRegionGroupsPlan.serializeImpl(stream); - - + createRegionGroupsPlan.serializeForProcedure(stream); + stream.writeInt(failedRegions.size()); + failedRegions.forEach( + (groupId, replica) -> { + ThriftCommonsSerDeUtils.serializeTConsensusGroupId(groupId, stream); + ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(replica, stream); + }); } @Override public void deserialize(ByteBuffer byteBuffer) { super.deserialize(byteBuffer); try { - createRegionGroupsPlan.deserializeImpl(byteBuffer); + createRegionGroupsPlan.deserializeForProcedure(byteBuffer); + failedRegions.clear(); + int failedRegionsSize = byteBuffer.getInt(); + while (failedRegionsSize-- > 0) { + TConsensusGroupId groupId = + ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer); + TRegionReplicaSet replica = + ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(byteBuffer); + failedRegions.put(groupId, replica); + } } catch (Exception e) { LOGGER.error("Deserialize meets error in CreateRegionGroupsProcedure", e); throw new RuntimeException(e); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveConfigNodeState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveConfigNodeState.java index 9c94ac1fde..dc7acbda82 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveConfigNodeState.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveConfigNodeState.java @@ -20,8 +20,7 @@ package org.apache.iotdb.confignode.procedure.state; public enum RemoveConfigNodeState { - REMOVE_CONSENSUS_GROUP, REMOVE_PEER, - + REMOVE_CONSENSUS_GROUP, STOP_CONFIG_NODE } diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java new file mode 100644 index 0000000000..97fd408b47 --- /dev/null +++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedureTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan; +import org.apache.iotdb.tsfile.utils.PublicBAOS; + +import org.junit.Test; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.iotdb.common.rpc.thrift.TConsensusGroupType.DataRegion; +import static org.apache.iotdb.common.rpc.thrift.TConsensusGroupType.SchemaRegion; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class CreateRegionGroupsProcedureTest { + + @Test + public void serializeDeserializeTest() { + TDataNodeLocation dataNodeLocation0 = new TDataNodeLocation(); + dataNodeLocation0.setDataNodeId(5); + dataNodeLocation0.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667)); + dataNodeLocation0.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003)); + dataNodeLocation0.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 8777)); + dataNodeLocation0.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010)); + dataNodeLocation0.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010)); + + TDataNodeLocation dataNodeLocation1 = new TDataNodeLocation(); + dataNodeLocation1.setDataNodeId(6); + dataNodeLocation1.setClientRpcEndPoint(new TEndPoint("0.0.0.1", 6667)); + dataNodeLocation1.setInternalEndPoint(new TEndPoint("0.0.0.1", 9003)); + dataNodeLocation1.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.1", 8777)); + dataNodeLocation1.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.1", 40010)); + dataNodeLocation1.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.1", 50010)); + + TConsensusGroupId schemaRegionGroupId = new TConsensusGroupId(SchemaRegion, 1); + TConsensusGroupId dataRegionGroupId = new TConsensusGroupId(DataRegion, 0); + + TRegionReplicaSet schemaRegionSet = + new TRegionReplicaSet(schemaRegionGroupId, Collections.singletonList(dataNodeLocation0)); + TRegionReplicaSet dataRegionSet = + new TRegionReplicaSet(dataRegionGroupId, Collections.singletonList(dataNodeLocation1)); + + // to test the equals method of Map<TConsensusGroupId, TRegionReplicaSet> + Map<TConsensusGroupId, TRegionReplicaSet> failedRegions0 = + new HashMap<TConsensusGroupId, TRegionReplicaSet>() { + { + put(dataRegionGroupId, dataRegionSet); + put(schemaRegionGroupId, schemaRegionSet); + } + }; + Map<TConsensusGroupId, TRegionReplicaSet> failedRegions1 = + new HashMap<TConsensusGroupId, TRegionReplicaSet>() { + { + put(schemaRegionGroupId, schemaRegionSet); + put(dataRegionGroupId, dataRegionSet); + } + }; + assertEquals(failedRegions0, failedRegions1); + + CreateRegionGroupsPlan createRegionGroupsPlan = new CreateRegionGroupsPlan(); + createRegionGroupsPlan.addRegionGroup("root.sg0", dataRegionSet); + createRegionGroupsPlan.addRegionGroup("root.sg1", schemaRegionSet); + + CreateRegionGroupsProcedure procedure0 = + new CreateRegionGroupsProcedure(createRegionGroupsPlan, failedRegions0); + PublicBAOS byteArrayOutputStream = new PublicBAOS(); + DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); + + try { + procedure0.serialize(outputStream); + CreateRegionGroupsProcedure procedure1 = new CreateRegionGroupsProcedure(); + ByteBuffer buffer = + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + buffer.getInt(); + procedure1.deserialize(buffer); + assertEquals(procedure0, procedure1); + } catch (IOException e) { + fail(); + } + } +}
