This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster_scalability in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0b4014b1e38190cbb458a3091d7a94a585699660 Author: lta <[email protected]> AuthorDate: Thu Apr 29 12:52:31 2021 +0800 This commit fixes all issues of ut tests. --- .../java/org/apache/iotdb/cluster/ClusterMain.java | 3 - .../cluster/client/async/AsyncClientPool.java | 4 +- .../iotdb/cluster/log/snapshot/FileSnapshot.java | 9 ++- .../cluster/query/ClusterDataQueryExecutor.java | 6 +- .../cluster/query/reader/ClusterReaderFactory.java | 64 +++++++++++++++------- .../cluster/query/reader/ClusterTimeGenerator.java | 3 +- .../cluster/server/PullSnapshotHintService.java | 6 +- .../org/apache/iotdb/cluster/server/Response.java | 6 +- .../server/handlers/caller/ElectionHandler.java | 2 +- .../cluster/server/member/DataGroupMember.java | 14 +++-- .../cluster/server/member/MetaGroupMember.java | 27 --------- .../cluster/client/sync/SyncClientAdaptorTest.java | 9 +++ .../iotdb/cluster/common/TestAsyncDataClient.java | 5 +- .../iotdb/cluster/common/TestDataGroupMember.java | 6 +- .../org/apache/iotdb/cluster/common/TestUtils.java | 2 +- .../cluster/log/applier/DataLogApplierTest.java | 2 +- .../cluster/log/applier/MetaLogApplierTest.java | 3 + .../FilePartitionedSnapshotLogManagerTest.java | 8 ++- .../cluster/log/snapshot/FileSnapshotTest.java | 4 +- .../cluster/log/snapshot/PullSnapshotTaskTest.java | 3 +- .../server/heartbeat/DataHeartbeatThreadTest.java | 4 +- .../cluster/server/member/DataGroupMemberTest.java | 40 +++++++++++--- .../cluster/server/member/MetaGroupMemberTest.java | 59 ++++++++++++-------- 23 files changed, 166 insertions(+), 123 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java index 44efd58..4d2d176 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java @@ -253,9 +253,6 @@ public class ClusterMain { logger.error("Cluster size is too small, cannot remove any node"); } else if (response == Response.RESPONSE_REJECT) { logger.error("Node {} is not found in the cluster, please check", nodeToRemove); - } else if (response == Response.RESPONSE_CHANGE_MEMBERSHIP_CONFLICT) { - logger.warn( - "The cluster is performing other change membership operations. Change membership operations should be performed one by one. Please try again later"); } else if (response == Response.RESPONSE_DATA_MIGRATION_NOT_FINISH) { logger.warn( "The data migration of the previous membership change operation is not finished. Please try again later"); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java index a7441e4..2a5a637 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java @@ -126,9 +126,7 @@ public class AsyncClientPool { this.wait(waitClientTimeutMS); if (clientStack.isEmpty() && System.currentTimeMillis() - waitStart >= waitClientTimeutMS) { logger.warn( - "Cannot get an available client after {}ms, create a new one.", - waitClientTimeutMS, - asyncClientFactory); + "Cannot get an available client after {}ms, create a new one.", waitClientTimeutMS); AsyncClient asyncClient = asyncClientFactory.getAsyncClient(clusterNode, this); nodeClientNumMap.computeIfPresent(clusterNode, (n, oldValue) -> oldValue + 1); return asyncClient; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java index c8514fa..5e47a66 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java @@ -269,7 +269,7 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot { resource.setMaxPlanIndex(dataGroupMember.getLogManager().getLastLogIndex()); loadRemoteFile(resource); } else { - if (isFileAlreadyPulled(resource)) { + if (!isFileAlreadyPulled(resource)) { loadRemoteFile(resource); } else { // notify the snapshot provider to remove the hardlink @@ -280,10 +280,9 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot { throw new PullFileException(resource.getTsFilePath(), resource.getSource(), e); } } - if (isDataMigration) { - // all files are loaded, the slot can be queried without accessing the previous holder - slotManager.setToNull(slot, false); - } + + // all files are loaded, the slot can be queried without accessing the previous holder + slotManager.setToNull(slot, !isDataMigration); logger.info("{}: slot {} is ready", name, slot); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java index de70cb6..9798f3b 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java @@ -122,8 +122,7 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor { timeFilter, null, context, - queryPlan.isAscending(), - null); + queryPlan.isAscending()); // combine reader of different partition group of the same path // into a MultManagedMergeReader @@ -176,8 +175,7 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor { timeFilter, null, context, - queryPlan.isAscending(), - null); + queryPlan.isAscending()); } catch (EmptyIntervalException e) { logger.info(e.getMessage()); return Collections.emptyList(); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java index 41f8faf..f7018d7 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java @@ -250,8 +250,7 @@ public class ClusterReaderFactory { Filter timeFilter, Filter valueFilter, QueryContext context, - boolean ascending, - Set<Integer> requiredSlots) + boolean ascending) throws StorageEngineException, EmptyIntervalException, QueryProcessException { Map<PartitionGroup, List<PartialPath>> partitionGroupListMap = Maps.newHashMap(); @@ -293,7 +292,7 @@ public class ClusterReaderFactory { valueFilter, context, ascending, - requiredSlots); + null); multPointReaders.add(abstractMultPointReader); } return multPointReaders; @@ -383,8 +382,7 @@ public class ClusterReaderFactory { Filter timeFilter, Filter valueFilter, QueryContext context, - boolean ascending, - Set<Integer> requiredSlots) + boolean ascending) throws StorageEngineException, EmptyIntervalException { // find the groups that should be queried using the timeFilter List<PartitionGroup> partitionGroups = metaGroupMember.routeFilter(timeFilter, path); @@ -407,7 +405,7 @@ public class ClusterReaderFactory { context, dataType, ascending, - requiredSlots); + null); mergeReader.addReader(seriesReader, 0); } } catch (IOException | QueryProcessException e) { @@ -961,6 +959,30 @@ public class ClusterReaderFactory { return executorId; } + public IBatchReader getSeriesBatchReader( + PartialPath path, + Set<String> allSensors, + TSDataType dataType, + Filter timeFilter, + Filter valueFilter, + QueryContext context, + DataGroupMember dataGroupMember, + boolean ascending, + Set<Integer> requiredSlots) + throws StorageEngineException, QueryProcessException, IOException { + return getSeriesBatchReader( + path, + allSensors, + dataType, + timeFilter, + valueFilter, + context, + dataGroupMember, + ascending, + requiredSlots, + true); + } + /** * Create an IBatchReader of "path" with “timeFilter” and "valueFilter". A synchronization with * the leader will be performed according to consistency level @@ -982,18 +1004,19 @@ public class ClusterReaderFactory { QueryContext context, DataGroupMember dataGroupMember, boolean ascending, - Set<Integer> requiredSlots) + Set<Integer> requiredSlots, + boolean syncLeader) throws StorageEngineException, QueryProcessException, IOException { - // pull the newest data - try { - dataGroupMember.syncLeaderWithConsistencyCheck(false); - } catch (CheckConsistencyException e) { - throw new StorageEngineException(e); + if (syncLeader) { + // pull the newest data + try { + dataGroupMember.syncLeaderWithConsistencyCheck(false); + } catch (CheckConsistencyException e) { + throw new StorageEngineException(e); + } } // find the groups that should be queried due to data migration. - // when a slot is in the status of PULLING or PULLING_WRITABLE, the read of it should merge - // result to guarantee integrity. Map<PartitionGroup, Set<Integer>> holderSlotMap = dataGroupMember.getPreviousHolderSlotMap(); // If requiredSlots is not null, it means that this data group is the previous holder of @@ -1093,20 +1116,19 @@ public class ClusterReaderFactory { for (int i = 0; i < paths.size(); i++) { PartialPath partialPath = paths.get(i); - SeriesReader seriesReader = - getSeriesReader( + IBatchReader batchReader = + getSeriesBatchReader( partialPath, allSensors.get(partialPath.getFullPath()), dataTypes.get(i), timeFilter, valueFilter, context, - dataGroupMember.getHeader(), - dataGroupMember.getRaftGroupId(), + dataGroupMember, ascending, - requiredSlots); - partialPathBatchReaderMap.put( - partialPath.getFullPath(), new SeriesRawDataBatchReader(seriesReader)); + requiredSlots, + false); + partialPathBatchReaderMap.put(partialPath.getFullPath(), batchReader); } return new MultBatchReader(partialPathBatchReaderMap); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java index a273906..c474912 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java @@ -100,8 +100,7 @@ public class ClusterTimeGenerator extends ServerTimeGenerator { null, filter, context, - queryPlan.isAscending(), - null); + queryPlan.isAscending()); } catch (Exception e) { throw new IOException(e); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java index 67f4b06..c99b59e 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java @@ -39,6 +39,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME; + public class PullSnapshotHintService { private static final Logger logger = LoggerFactory.getLogger(PullSnapshotHintService.class); @@ -62,9 +64,9 @@ public class PullSnapshotHintService { return; } - service.shutdown(); + service.shutdownNow(); try { - service.awaitTermination(3, TimeUnit.MINUTES); + service.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warn("{}: PullSnapshotHintService exiting interrupted", member.getName()); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java index 49ffec0..006eec1 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java @@ -46,12 +46,10 @@ public class Response { // the new node, which tries to join the cluster, contains conflicted parameters with the // cluster, so the operation is rejected. public static final long RESPONSE_NEW_NODE_PARAMETER_CONFLICT = -9; - // add/remove node operations should one by one - public static final long RESPONSE_CHANGE_MEMBERSHIP_CONFLICT = -10; // the data migration of previous add/remove node operations is not finished. - public static final long RESPONSE_DATA_MIGRATION_NOT_FINISH = -11; + public static final long RESPONSE_DATA_MIGRATION_NOT_FINISH = -10; // the node has removed from the group, so the operation is rejected. - public static final long RESPONSE_NODE_IS_NOT_IN_GROUP = -12; + public static final long RESPONSE_NODE_IS_NOT_IN_GROUP = -11; // the request is not executed locally anc should be forwarded public static final long RESPONSE_NULL = Long.MIN_VALUE; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ElectionHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ElectionHandler.java index a6d596b..6190d20 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ElectionHandler.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ElectionHandler.java @@ -106,7 +106,7 @@ public class ElectionHandler implements AsyncMethodCallback<Long> { // the rejection from a node with a smaller term means the log of this node falls behind logger.info("{}: Election {} rejected: code {}", memberName, currTerm, voterResp); onFail(); - } else if (voterResp != RESPONSE_NODE_IS_NOT_IN_GROUP) { + } else if (voterResp == RESPONSE_NODE_IS_NOT_IN_GROUP) { logger.info("{}: This node has removed from the group", memberName); onFail(); } else { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java index f99b6f6..43eaefb 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java @@ -157,10 +157,12 @@ public class DataGroupMember extends RaftMember { private LastAppliedPatitionTableVersion lastAppliedPartitionTableVersion; @TestOnly - public DataGroupMember() { + public DataGroupMember(PartitionGroup nodes) { // constructor for test + allNodes = nodes; setQueryManager(new ClusterQueryManager()); localQueryExecutor = new LocalQueryExecutor(this); + lastAppliedPartitionTableVersion = new LastAppliedPatitionTableVersion(getMemberDir()); } DataGroupMember( @@ -207,10 +209,6 @@ public class DataGroupMember extends RaftMember { heartBeatService.submit(new DataHeartbeatThread(this)); pullSnapshotService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); pullSnapshotHintService = new PullSnapshotHintService(this); - logger.info("{}: has inited pullSnapshotService and pullSnapshotHintService", name); - if (pullSnapshotHintService == null) { - logger.error("{}: pullSnapshotHintService is null", name); - } pullSnapshotHintService.start(); resumePullSnapshotTasks(); } @@ -799,7 +797,7 @@ public class DataGroupMember extends RaftMember { // pull the slots that should be taken over PullSnapshotTaskDescriptor taskDescriptor = new PullSnapshotTaskDescriptor( - removalResult.getRemovedGroup(getRaftGroupId()), slotsToPull, true); + removalResult.getRemovedGroup(getRaftGroupId()), new ArrayList<>(slotsToPull), true); pullFileSnapshot(taskDescriptor, null); } } @@ -891,6 +889,10 @@ public class DataGroupMember extends RaftMember { pullSnapshotHintService.registerHint(descriptor); } + /** + * Find the groups that should be queried due to data migration. When a slot is in the status of + * PULLING or PULLING_WRITABLE, the read of it should merge result to guarantee integrity. + */ public Map<PartitionGroup, Set<Integer>> getPreviousHolderSlotMap() { Map<PartitionGroup, Set<Integer>> holderSlotMap = new HashMap<>(); RaftNode raftNode = new RaftNode(getHeader(), getRaftGroupId()); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index ba5e97a..f9f885c 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@ -613,9 +613,6 @@ public class MetaGroupMember extends RaftMember { setNodeIdentifier(genNodeIdentifier()); } else if (resp.getRespNum() == Response.RESPONSE_NEW_NODE_PARAMETER_CONFLICT) { handleConfigInconsistency(resp); - } else if (resp.getRespNum() == Response.RESPONSE_CHANGE_MEMBERSHIP_CONFLICT) { - logger.warn( - "The cluster is performing other change membership operations. Change membership operations should be performed one by one. Please try again later"); } else if (resp.getRespNum() == Response.RESPONSE_DATA_MIGRATION_NOT_FINISH) { logger.warn( "The data migration of the previous membership change operation is not finished. Please try again later"); @@ -914,14 +911,12 @@ public class MetaGroupMember extends RaftMember { return true; } - boolean nodeExistInPartitionTable = false; for (Node node : partitionTable.getAllNodes()) { if (node.internalIp.equals(newNode.internalIp) && newNode.dataPort == node.dataPort && newNode.metaPort == node.metaPort && newNode.clientPort == node.clientPort) { newNode.nodeIdentifier = node.nodeIdentifier; - nodeExistInPartitionTable = true; break; } } @@ -934,11 +929,6 @@ public class MetaGroupMember extends RaftMember { return true; } - if (!nodeExistInPartitionTable && partitionTable.getAllNodes().size() != allNodes.size()) { - response.setRespNum((int) Response.RESPONSE_CHANGE_MEMBERSHIP_CONFLICT); - return true; - } - Node idConflictNode = idNodeMap.get(newNode.getNodeIdentifier()); if (idConflictNode != null) { logger.debug("{}'s id conflicts with {}", newNode, idConflictNode); @@ -2056,23 +2046,6 @@ public class MetaGroupMember extends RaftMember { return Response.RESPONSE_REJECT; } - if (partitionTable.getAllNodes().contains(target) - && partitionTable.getAllNodes().size() != allNodes.size()) { - return Response.RESPONSE_CHANGE_MEMBERSHIP_CONFLICT; - } - - // // If it is to remove the leader of meta group, transfer leader authority. - // if (node.equals(thisNode)) { - // logger.info("Remove the leader of meta group, it should step down and transfer - // leadership. Remove node: {}", node); - // setSkipElection(true); - // setCharacter(NodeCharacter.ELECTOR); - // setLeader(null); - // waitLeader(); - // setSkipElection(false); - // return Response.RESPONSE_NULL; - // } - RemoveNodeLog removeNodeLog = new RemoveNodeLog(); // node removal must be serialized to reduce potential concurrency problem synchronized (logManager) { diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java index 14be669..4b871f3 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java @@ -194,6 +194,15 @@ public class SyncClientAdaptorTest { } @Override + public void getChildNodePathInNextLevel( + Node header, + int raftId, + String path, + AsyncMethodCallback<Set<String>> resultHandler) { + resultHandler.onComplete(new HashSet<>(Arrays.asList("1", "2", "3"))); + } + + @Override public void getAllMeasurementSchema( Node header, int raftId, diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java index 269c731..214b3c9 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java @@ -44,6 +44,7 @@ import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.qp.executor.PlanExecutor; import org.apache.iotdb.db.qp.physical.PhysicalPlan; +import org.apache.iotdb.db.qp.physical.sys.LogPlan; import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.thrift.TException; @@ -171,7 +172,9 @@ public class TestAsyncDataClient extends AsyncDataClient { () -> { try { PhysicalPlan plan = PhysicalPlan.Factory.create(request.planBytes); - planExecutor.processNonQuery(plan); + if (!(plan instanceof LogPlan)) { + planExecutor.processNonQuery(plan); + } resultHandler.onComplete(StatusUtils.OK); } catch (IOException | QueryProcessException diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java index 023084e..6056fff 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java @@ -31,16 +31,14 @@ import java.util.Collections; public class TestDataGroupMember extends DataGroupMember { public TestDataGroupMember() { - super(); + super(new PartitionGroup(Collections.singletonList(TestUtils.getNode(0)))); setQueryManager(new ClusterQueryManager()); this.slotManager = new SlotManager(ClusterConstant.SLOT_NUM, null, ""); - this.allNodes = new PartitionGroup(Collections.singletonList(TestUtils.getNode(0))); } public TestDataGroupMember(Node thisNode, PartitionGroup allNodes) { - super(); + super(allNodes); this.thisNode = thisNode; - this.allNodes = allNodes; this.slotManager = new SlotManager(ClusterConstant.SLOT_NUM, null, ""); setQueryManager(new ClusterQueryManager()); } diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java index 0182780..0a33d01 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java @@ -67,7 +67,7 @@ public class TestUtils { public static long TEST_TIME_OUT_MS = 200; - public static ByteBuffer seralizePartitionTable = new SlotPartitionTable(getNode(0)).serialize(); + public static ByteBuffer seralizePartitionTable = getPartitionTable(3).serialize(); private TestUtils() { // util class diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java index 6327da2..163266a 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java @@ -285,7 +285,7 @@ public class DataLogApplierTest extends IoTDBTest { insertPlan.setDeviceId(new PartialPath(TestUtils.getTestSg(16))); applier.apply(log); assertEquals( - "Storage group is not set for current seriesPath: [root.test16]", + "org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException: Storage group is not set for current seriesPath: [root.test16]", log.getException().getMessage()); } diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java index c36e948..013403e 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java @@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.log.applier; import org.apache.iotdb.cluster.common.IoTDBTest; import org.apache.iotdb.cluster.common.TestMetaGroupMember; import org.apache.iotdb.cluster.common.TestUtils; +import org.apache.iotdb.cluster.coordinator.Coordinator; import org.apache.iotdb.cluster.log.LogApplier; import org.apache.iotdb.cluster.log.logtypes.AddNodeLog; import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog; @@ -80,6 +81,8 @@ public class MetaLogApplierTest extends IoTDBTest { @Test public void testApplyAddNode() { nodes.clear(); + testMetaGroupMember.setCoordinator(new Coordinator()); + testMetaGroupMember.setPartitionTable(TestUtils.getPartitionTable(3)); Node node = new Node("localhost", 1111, 0, 2222, Constants.RPC_PORT, "localhost"); AddNodeLog log = new AddNodeLog(); log.setNewNode(node); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManagerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManagerTest.java index 573be2d..6a85895 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManagerTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManagerTest.java @@ -41,6 +41,7 @@ import org.junit.After; import org.junit.Test; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -91,7 +92,12 @@ public class FilePartitionedSnapshotLogManagerTest extends IoTDBTest { PlanExecutor executor = new PlanExecutor(); executor.processNonQuery(plan); - manager.takeSnapshot(); + List<Integer> requireSlots = new ArrayList<>(); + ((SlotPartitionTable) manager.partitionTable) + .getAllNodeSlots() + .values() + .forEach(requireSlots::addAll); + manager.takeSnapshotForSpecificSlots(requireSlots, true); PartitionedSnapshot snapshot = (PartitionedSnapshot) manager.getSnapshot(); for (int i = 1; i < 4; i++) { FileSnapshot fileSnapshot = diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshotTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshotTest.java index dc42183..95124db 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshotTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshotTest.java @@ -254,7 +254,7 @@ public class FileSnapshotTest extends DataSnapshotTest { List<TsFileResource> loadedFiles = processor.getSequenceFileTreeSet(); assertEquals(10, loadedFiles.size()); for (int i = 0; i < 9; i++) { - assertEquals(i, loadedFiles.get(i).getMaxPlanIndex()); + assertEquals(-1, loadedFiles.get(i).getMaxPlanIndex()); } assertEquals(0, processor.getUnSequenceFileList().size()); } @@ -301,6 +301,6 @@ public class FileSnapshotTest extends DataSnapshotTest { for (int i = 0; i < 9; i++) { assertEquals(i, loadedFiles.get(i).getMaxPlanIndex()); } - assertEquals(0, processor.getUnSequenceFileList().size()); + assertEquals(1, processor.getUnSequenceFileList().size()); } } diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java index 604751a..d96c982 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java @@ -183,6 +183,7 @@ public class PullSnapshotTaskTest extends DataSnapshotTest { } }; sourceMember.setMetaGroupMember(metaGroupMember); + sourceMember.setLogManager(new TestLogManager(0)); sourceMember.setThisNode(TestUtils.getNode(0)); targetMember = new TestDataGroupMember() { @@ -298,7 +299,7 @@ public class PullSnapshotTaskTest extends DataSnapshotTest { loadedFiles.get(i).getMaxPlanIndex(), loadedFiles.get(i).getTsFile().getAbsolutePath()); } - assertEquals(i, loadedFiles.get(i).getMaxPlanIndex()); + assertEquals(-1, loadedFiles.get(i).getMaxPlanIndex()); } assertEquals(0, processor.getUnSequenceFileList().size()); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/DataHeartbeatThreadTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/DataHeartbeatThreadTest.java index ee81577..efb1407 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/DataHeartbeatThreadTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/DataHeartbeatThreadTest.java @@ -122,8 +122,8 @@ public class DataHeartbeatThreadTest extends HeartbeatThreadTest { () -> { assertEquals(TestUtils.getNode(0), request.getElector()); assertEquals(11, request.getTerm()); - assertEquals(6, request.getLastLogIndex()); - assertEquals(6, request.getLastLogTerm()); + assertEquals(13, request.getLastLogIndex()); + assertEquals(13, request.getLastLogTerm()); if (respondToElection) { resultHandler.onComplete(Response.RESPONSE_AGREE); } diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java index a3e3222..a7c0c3c 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java @@ -221,16 +221,24 @@ public class DataGroupMemberTest extends BaseMember { @Override public void pullMeasurementSchema( PullSchemaRequest request, AsyncMethodCallback<PullSchemaResp> resultHandler) { - dataGroupMemberMap.get(request.getHeader()).setCharacter(NodeCharacter.LEADER); - new DataAsyncService(dataGroupMemberMap.get(request.getHeader())) + dataGroupMemberMap + .get(new RaftNode(request.getHeader(), request.getRaftId())) + .setCharacter(NodeCharacter.LEADER); + new DataAsyncService( + dataGroupMemberMap.get( + new RaftNode(request.getHeader(), request.getRaftId()))) .pullMeasurementSchema(request, resultHandler); } @Override public void pullTimeSeriesSchema( PullSchemaRequest request, AsyncMethodCallback<PullSchemaResp> resultHandler) { - dataGroupMemberMap.get(request.getHeader()).setCharacter(NodeCharacter.LEADER); - new DataAsyncService(dataGroupMemberMap.get(request.getHeader())) + dataGroupMemberMap + .get(new RaftNode(request.getHeader(), request.getRaftId())) + .setCharacter(NodeCharacter.LEADER); + new DataAsyncService( + dataGroupMemberMap.get( + new RaftNode(request.getHeader(), request.getRaftId()))) .pullTimeSeriesSchema(request, resultHandler); } @@ -372,7 +380,7 @@ public class DataGroupMemberTest extends BaseMember { testMetaMember.getTerm().set(10); List<Log> metaLogs = TestUtils.prepareTestLogs(6); metaLogManager.append(metaLogs); - Node voteFor = new Node("127.0.0.1", 30000, 0, 40000, Constants.RPC_PORT, "127.0.0.1"); + Node voteFor = TestUtils.getNode(0); Node elector = new Node("127.0.0.1", 30001, 1, 40001, Constants.RPC_PORT + 1, "127.0.0.1"); // a request with smaller term @@ -380,6 +388,7 @@ public class DataGroupMemberTest extends BaseMember { electionRequest.setTerm(1); electionRequest.setLastLogIndex(100); electionRequest.setLastLogTerm(100); + electionRequest.setElector(TestUtils.getNode(0)); TestHandler handler = new TestHandler(); new DataAsyncService(dataGroupMember).startElection(electionRequest, handler); assertEquals(10, handler.getResponse()); @@ -390,6 +399,15 @@ public class DataGroupMemberTest extends BaseMember { new DataAsyncService(dataGroupMember).startElection(electionRequest, handler); assertEquals(Response.RESPONSE_AGREE, handler.getResponse()); + dataGroupMember.setVoteFor(null); + + // a request with same term and voteFor is empty and elector is not in the group + electionRequest.setTerm(10); + electionRequest.setElector(elector); + handler = new TestHandler(); + new DataAsyncService(dataGroupMember).startElection(electionRequest, handler); + assertEquals(Response.RESPONSE_NODE_IS_NOT_IN_GROUP, handler.getResponse()); + dataGroupMember.setVoteFor(voteFor); // a request with same term and voteFor is not empty and elector is not same to voteFor @@ -410,14 +428,16 @@ public class DataGroupMemberTest extends BaseMember { // a request with with larger term and stale data log // should reject election but update term electionRequest.setTerm(14); - electionRequest.setLastLogIndex(100); - electionRequest.setLastLogTerm(100); + electionRequest.setLastLogIndex(1); + electionRequest.setLastLogTerm(1); new DataAsyncService(dataGroupMember).startElection(electionRequest, handler); assertEquals(Response.RESPONSE_LOG_MISMATCH, handler.getResponse()); assertEquals(14, dataGroupMember.getTerm().get()); // a valid request with with larger term electionRequest.setTerm(15); + electionRequest.setLastLogIndex(100); + electionRequest.setLastLogTerm(100); new DataAsyncService(dataGroupMember).startElection(electionRequest, handler); assertEquals(Response.RESPONSE_AGREE, handler.getResponse()); assertEquals(15, dataGroupMember.getTerm().get()); @@ -639,6 +659,7 @@ public class DataGroupMemberTest extends BaseMember { PullSchemaRequest request = new PullSchemaRequest(); request.setPrefixPaths(Collections.singletonList(TestUtils.getTestSg(0))); request.setHeader(TestUtils.getNode(0)); + request.setRaftId(0); AtomicReference<List<TimeseriesSchema>> result = new AtomicReference<>(); PullTimeseriesSchemaHandler handler = new PullTimeseriesSchemaHandler(TestUtils.getNode(1), request.getPrefixPaths(), result); @@ -1067,9 +1088,10 @@ public class DataGroupMemberTest extends BaseMember { dataGroupMember.removeNode(nodeToRemove); assertEquals(NodeCharacter.ELECTOR, dataGroupMember.getCharacter()); - assertEquals(Long.MIN_VALUE, dataGroupMember.getLastHeartbeatReceivedTime()); assertTrue(dataGroupMember.getAllNodes().contains(TestUtils.getNode(30))); assertFalse(dataGroupMember.getAllNodes().contains(nodeToRemove)); + + dataGroupMember.pullSlots(nodeRemovalResult); List<Integer> newSlots = nodeRemovalResult.getNewSlotOwners().get(new RaftNode(TestUtils.getNode(0), raftId)); while (newSlots.size() != pulledSnapshots.size()) {} @@ -1097,6 +1119,8 @@ public class DataGroupMemberTest extends BaseMember { assertEquals(0, dataGroupMember.getLastHeartbeatReceivedTime()); assertTrue(dataGroupMember.getAllNodes().contains(TestUtils.getNode(30))); assertFalse(dataGroupMember.getAllNodes().contains(nodeToRemove)); + + dataGroupMember.pullSlots(nodeRemovalResult); List<Integer> newSlots = ((SlotNodeRemovalResult) nodeRemovalResult) .getNewSlotOwners() diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java index 1700a3c..dac5231 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java @@ -130,14 +130,12 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import static org.apache.iotdb.cluster.server.NodeCharacter.ELECTOR; import static org.apache.iotdb.cluster.server.NodeCharacter.FOLLOWER; import static org.apache.iotdb.cluster.server.NodeCharacter.LEADER; -import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -151,6 +149,7 @@ public class MetaGroupMemberTest extends BaseMember { private DataClusterServer dataClusterServer; protected boolean mockDataClusterServer; private Node exiledNode; + private final Object waitExileNode = new Object(); private int prevReplicaNum; private List<String> prevSeedNodes; @@ -176,13 +175,7 @@ public class MetaGroupMemberTest extends BaseMember { RaftServer.setReadOperationTimeoutMS(1000); super.setUp(); - partitionTable = - new SlotPartitionTable(allNodes, TestUtils.getNode(0)) { - @Override - public RaftNode routeToHeaderByTime(String storageGroupName, long timestamp) { - return new RaftNode(TestUtils.getNode(0), 0); - } - }; + partitionTable = new SlotPartitionTable(allNodes, TestUtils.getNode(0)); testMetaMember.setPartitionTable(partitionTable); dummyResponse.set(Response.RESPONSE_AGREE); testMetaMember.setAllNodes(allNodes); @@ -498,7 +491,10 @@ public class MetaGroupMemberTest extends BaseMember { public void exile( ByteBuffer removeNodeLog, AsyncMethodCallback<Void> resultHandler) { System.out.printf("%s was exiled%n", node); - exiledNode = node; + synchronized (waitExileNode) { + exiledNode = node; + waitExileNode.notifyAll(); + } } @Override @@ -527,6 +523,16 @@ public class MetaGroupMemberTest extends BaseMember { }) .start(); } + + @Override + public void collectMigrationStatus(AsyncMethodCallback<ByteBuffer> resultHandler) { + new Thread( + () -> { + resultHandler.onComplete( + ClusterUtils.serializeMigrationStatus(Collections.emptyMap())); + }) + .start(); + } }; } catch (IOException e) { return null; @@ -983,8 +989,7 @@ public class MetaGroupMemberTest extends BaseMember { TimeFilter.gtEq(5), ValueFilter.ltEq(8.0), context, - true, - null); + true); assertTrue(reader.hasNextBatch()); BatchData batchData = reader.nextBatch(); for (int j = 5; j < 9; j++) { @@ -1201,16 +1206,16 @@ public class MetaGroupMemberTest extends BaseMember { assertTrue(response.getCheckStatusResponse().isClusterNameEquals()); // cannot add a node due to network failure - dummyResponse.set(Response.RESPONSE_NO_CONNECTION); - testMetaMember.setCharacter(LEADER); - result.set(null); - testMetaMember.setPartitionTable(partitionTable); - new Thread( - () -> { - await().atLeast(200, TimeUnit.MILLISECONDS); - dummyResponse.set(Response.RESPONSE_AGREE); - }) - .start(); + // dummyResponse.set(Response.RESPONSE_NO_CONNECTION); + // testMetaMember.setCharacter(LEADER); + // result.set(null); + // testMetaMember.setPartitionTable(partitionTable); + // new Thread( + // () -> { + // await().atLeast(200, TimeUnit.MILLISECONDS); + // dummyResponse.set(Response.RESPONSE_AGREE); + // }) + // .start(); new MetaAsyncService(testMetaMember) .addNode(TestUtils.getNode(12), TestUtils.getStartUpStatus(), handler); response = result.get(); @@ -1284,7 +1289,6 @@ public class MetaGroupMemberTest extends BaseMember { assertEquals(Response.RESPONSE_AGREE, (long) resultRef.get()); assertFalse(testMetaMember.getAllNodes().contains(TestUtils.getNode(40))); assertEquals(ELECTOR, testMetaMember.getCharacter()); - assertEquals(Long.MIN_VALUE, testMetaMember.getLastHeartbeatReceivedTime()); } @Test @@ -1311,7 +1315,14 @@ public class MetaGroupMemberTest extends BaseMember { assertEquals(Response.RESPONSE_AGREE, (long) resultRef.get()); assertFalse(testMetaMember.getAllNodes().contains(TestUtils.getNode(20))); System.out.println("Checking exiled node in testRemoveNodeAsLeader()"); - assertEquals(TestUtils.getNode(20), exiledNode); + synchronized (waitExileNode) { + try { + waitExileNode.wait(); + } catch (InterruptedException e) { + // ignore + } + assertEquals(TestUtils.getNode(20), exiledNode); + } } @Test
