This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch cluster_node_deletion
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_node_deletion by this
push:
new 424accd fix tests
424accd is described below
commit 424accd6b82342439190f295386a806c531e1935
Author: jt2594838 <[email protected]>
AuthorDate: Tue Feb 11 18:05:07 2020 +0800
fix tests
---
.../iotdb/cluster/config/ClusterDescriptor.java | 2 +-
.../iotdb/cluster/partition/PartitionTable.java | 79 +++-----
.../cluster/server/member/MetaGroupMember.java | 30 ++-
.../apache/iotdb/cluster/utils/PartitionUtils.java | 7 +-
.../iotdb/cluster/log/client/ClientPoolTest.java | 105 -----------
.../iotdb/cluster/log/log/LogParserTest.java | 82 ---------
.../log/log/applier/DataLogApplierTest.java | 149 ---------------
.../log/log/applier/MetaLogApplierTest.java | 105 -----------
.../log/log/catchup/LogCatchUpTaskTest.java | 102 -----------
.../log/log/catchup/SnapshotCatchUpTaskTest.java | 146 ---------------
.../cluster/log/log/logtypes/SerializeLogTest.java | 81 ---------
.../FilePartitionedSnapshotLogManagerTest.java | 115 ------------
.../log/log/manage/MemoryLogManagerTest.java | 152 ----------------
.../manage/MetaSingleSnapshotLogManagerTest.java | 80 --------
.../log/log/snapshot/PullSnapshotTaskTest.java | 99 ----------
.../log/log/snapshot/SnapshotSerializeTest.java | 201 ---------------------
.../log/partition/SlotPartitionTableTest.java | 138 --------------
.../log/query/manage/ClusterQueryManagerTest.java | 120 ------------
.../log/query/manage/QueryCoordinatorTest.java | 103 -----------
.../cluster/partition/SlotPartitionTableTest.java | 58 +++---
.../cluster/server/member/MetaGroupMemberTest.java | 3 +-
21 files changed, 97 insertions(+), 1860 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index e791903..d60f9c4 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -94,7 +94,7 @@ public class ClusterDescriptor {
Integer.toString(config.getLocalDataPort()))));
config.setLocalClientPort(Integer.parseInt(properties.getProperty("LOCAL_CLIENT_PORT",
- Integer.toString(config.getLocalDataPort()))));
+ Integer.toString(config.getLocalClientPort()))));
config.setMaxConcurrentClientNum(Integer.parseInt(properties.getProperty(
"MAX_CONCURRENT_CLIENT_NUM",
String.valueOf(config.getMaxConcurrentClientNum()))));
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
index a6d94d5..676693a 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
@@ -25,23 +25,26 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import org.apache.commons.collections4.map.MultiKeyMap;
import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
-import org.apache.iotdb.cluster.log.Log;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.utils.PartitionUtils;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.*;
-import org.apache.iotdb.db.qp.physical.sys.*;
+import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
+import org.apache.iotdb.db.qp.physical.sys.CountPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
+import org.apache.iotdb.db.qp.physical.sys.PropertyPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType;
+import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.Binary;
import org.slf4j.Logger;
@@ -140,45 +143,11 @@ public interface PartitionTable {
//==============================================================================================//
- //All the follwoing are default methods.
+ //All the following are default methods.
+ // TODO-Cluster: abstract these as QueryRouter
//==============================================================================================//
-
- default int calculateLogSlot(Log log) {
- if (log instanceof PhysicalPlanLog) {
- PhysicalPlanLog physicalPlanLog = ((PhysicalPlanLog) log);
- PhysicalPlan plan = physicalPlanLog.getPlan();
- String storageGroup = null;
- if (plan instanceof CreateTimeSeriesPlan) {
- try {
- storageGroup = getMManager()
- .getStorageGroupNameByPath(((CreateTimeSeriesPlan)
plan).getPath().getFullPath());
- //timestamp is meaningless, use 0 instead.
- return PartitionUtils.calculateStorageGroupSlot(storageGroup, 0,
this.getTotalSlotNumbers());
- } catch (MetadataException e) {
- logger.error("Cannot find the storage group of {}",
((CreateTimeSeriesPlan) plan).getPath());
- return -1;
- }
- } else if (plan instanceof InsertPlan || plan instanceof
BatchInsertPlan) {
- try {
- storageGroup = getMManager()
- .getStorageGroupNameByPath(((InsertPlan) plan).getDeviceId());
- } catch (StorageGroupNotSetException e) {
- logger.error("Cannot find the storage group of {}",
((CreateTimeSeriesPlan) plan).getPath());
- return -1;
- }
- } else if (plan instanceof DeletePlan) {
- //TODO deleteplan may have many SGs.
- logger.error("not implemented for DeletePlan in cluster {}", plan);
- return -1;
- }
-
- return Math.abs(Objects.hash(storageGroup, 0));
- }
- return 0;
- }
-
default PartitionGroup routePlan(PhysicalPlan plan)
- throws UnsupportedPlanException, StorageGroupNotSetException,
IllegalPathException {
+ throws UnsupportedPlanException, StorageGroupNotSetException {
if (plan instanceof InsertPlan) {
return routePlan((InsertPlan) plan);
} else if (plan instanceof CreateTimeSeriesPlan) {
@@ -195,7 +164,7 @@ public interface PartitionTable {
logger.error("{} is a global plan. Please forward it to all
partitionGroups", plan);
}
if (plan.canbeSplit()) {
- logger.error("{} can be split. Please call splitPlanAndMapToGroups");
+ logger.error("{} can be split. Please call splitPlanAndMapToGroups",
plan);
}
throw new UnsupportedPlanException(plan);
}
@@ -210,8 +179,7 @@ public interface PartitionTable {
return partitionByPathTime(plan.getPath().getFullPath(), 0);
}
- default PartitionGroup routePlan(ShowChildPathsPlan plan)
- throws UnsupportedPlanException,StorageGroupNotSetException,
IllegalPathException {
+ default PartitionGroup routePlan(ShowChildPathsPlan plan) {
try {
return
route(getMManager().getStorageGroupNameByPath(plan.getPath().getFullPath()), 0);
} catch (StorageGroupNotSetException e) {
@@ -222,7 +190,7 @@ public interface PartitionTable {
}
default PartitionGroup routePlan(PropertyPlan plan)
- throws UnsupportedPlanException,StorageGroupNotSetException,
IllegalPathException {
+ throws UnsupportedPlanException {
logger.error("PropertyPlan is not implemented");
throw new UnsupportedPlanException(plan);
}
@@ -241,6 +209,8 @@ public interface PartitionTable {
return splitAndRoutePlan((DataAuthPlan) plan);
} else if (plan instanceof ShowDevicesPlan) {
return splitAndRoutePlan((ShowDevicesPlan) plan);
+ } else if (plan instanceof CreateTimeSeriesPlan) {
+ return splitAndRoutePlan((CreateTimeSeriesPlan) plan);
}
//the if clause can be removed after the program is stable
if (PartitionUtils.isLocalPlan(plan)) {
@@ -249,16 +219,22 @@ public interface PartitionTable {
logger.error("{} is a global plan. Please forward it to all
partitionGroups", plan);
}
if (!plan.canbeSplit()) {
- logger.error("{} cannot be split. Please call routePlan");
+ logger.error("{} cannot be split. Please call routePlan", plan);
}
throw new UnsupportedPlanException(plan);
}
+ default Map<PhysicalPlan, PartitionGroup>
splitAndRoutePlan(CreateTimeSeriesPlan plan)
+ throws StorageGroupNotSetException {
+ PartitionGroup partitionGroup =
partitionByPathTime(plan.getPath().getFullPath(), 0);
+ return Collections.singletonMap(plan, partitionGroup);
+ }
+
+ @SuppressWarnings("SuspiciousSystemArraycopy")
default Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(BatchInsertPlan
plan)
throws StorageGroupNotSetException {
String storageGroup =
getMManager().getStorageGroupNameByPath(plan.getDeviceId());
Map<PhysicalPlan, PartitionGroup> result = new HashMap<>();
- MultiKeyMap<Long, PartitionGroup> timeRangeMapRaftGroup = new
MultiKeyMap<>();
long[] times = plan.getTimes();
if(times.length == 0) {
return Collections.emptyMap();
@@ -382,8 +358,7 @@ public interface PartitionTable {
return result;
}
- default Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(DataAuthPlan
plan)
- throws UnsupportedPlanException,StorageGroupNotSetException,
IllegalPathException {
+ default Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(DataAuthPlan
plan) {
//TODO
//why this plan has not Path field?
return null;
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index ebc606b..523a546 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -41,6 +41,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
@@ -99,6 +100,7 @@ import
org.apache.iotdb.cluster.server.member.DataGroupMember.Factory;
import org.apache.iotdb.cluster.utils.PartitionUtils;
import org.apache.iotdb.cluster.utils.SerializeUtils;
import org.apache.iotdb.cluster.utils.StatusUtils;
+import org.apache.iotdb.cluster.utils.nodetool.function.Partition;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.StorageEngineException;
@@ -115,6 +117,7 @@ import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -243,7 +246,9 @@ public class MetaGroupMember extends RaftMember implements
TSMetaService.AsyncIf
getDataClusterServer().stop();
clientServer.stop();
}
- reportThread.shutdownNow();
+ if (reportThread != null) {
+ reportThread.shutdownNow();
+ }
}
private void initSubServers() throws TTransportException, StartupException {
@@ -934,18 +939,33 @@ public class MetaGroupMember extends RaftMember
implements TSMetaService.AsyncIf
}
logger.debug("{}: The data group of {} is {}", name, plan, planGroupMap);
- TSStatus status = null;
+ TSStatus status;
+ List<Entry<PhysicalPlan, PartitionGroup>> succeededEntries = new
ArrayList<>();
+ List<Integer> errorCodes = new ArrayList<>();
for(Map.Entry<PhysicalPlan, PartitionGroup> entry :
planGroupMap.entrySet()) {
+ TSStatus subStatus;
if (entry.getValue().contains(thisNode)) {
// the query should be handled by a group the local node is in, handle
it with in the group
- TSStatus subStatus =
getDataClusterServer().getDataMember(entry.getValue().getHeader(), null, plan)
+ subStatus =
getDataClusterServer().getDataMember(entry.getValue().getHeader(), null, plan)
.executeNonQuery(entry.getKey());
} else {
// forward the query to the group that should handle it
- TSStatus subStatus = forwardPlan(entry.getKey(), entry.getValue());
+ subStatus = forwardPlan(entry.getKey(), entry.getValue());
+ }
+ if (subStatus.getStatusType().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ errorCodes.add(subStatus.getStatusType().getCode());
+ } else {
+ succeededEntries.add(entry);
}
}
- //TODO merge all sub status together.
+ if (errorCodes.isEmpty()) {
+ status = StatusUtils.OK;
+ } else {
+ status = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
+ status.getStatusType().setMessage("The following errors occurred when
executing the query, "
+ + "please retry or contact the DBA: " + errorCodes.toString());
+ //TODO-Cluster: abort the succeeded ones if necessary.
+ }
return status;
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
index 8a1222d..6bf8b89 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan;
@@ -66,7 +67,10 @@ public class PartitionUtils {
|| (plan instanceof ShowPlan
&& ((ShowPlan)
plan).getShowContentType().equals(ShowContentType.FLUSH_TASK_INFO))
|| (plan instanceof ShowPlan
- && ((ShowPlan)
plan).getShowContentType().equals(ShowContentType.VERSION));
+ && ((ShowPlan)
plan).getShowContentType().equals(ShowContentType.VERSION))
+ || (plan instanceof ShowPlan
+ && ((ShowPlan) plan).getShowContentType().equals(ShowContentType.TTL))
+ ;
}
/**
@@ -84,6 +88,7 @@ public class PartitionUtils {
|| plan instanceof DeleteTimeSeriesPlan
//delete timeseries plan is global because all nodes may have its
data
|| plan instanceof AuthorPlan
+ || plan instanceof DeleteStorageGroupPlan
;
}
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/client/ClientPoolTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/client/ClientPoolTest.java
deleted file mode 100644
index 62ab935..0000000
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/client/ClientPoolTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.log.client;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.iotdb.cluster.client.ClientFactory;
-import org.apache.iotdb.cluster.client.ClientPool;
-import org.apache.iotdb.cluster.client.DataClient;
-import org.apache.iotdb.cluster.client.MetaClient;
-import org.apache.iotdb.cluster.common.TestClient;
-import org.apache.iotdb.cluster.common.TestClientFactory;
-import org.apache.iotdb.cluster.common.TestUtils;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
-import org.apache.thrift.async.TAsyncClientManager;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.junit.Test;
-import org.mockito.Mock;
-
-public class ClientPoolTest {
-
- @Mock
- private ClientFactory testClientFactory;
-
- @Test
- public void testTestClient() throws IOException {
- testClientFactory = new TestClientFactory();
- getClient();
- putClient();
- }
-
- @Test
- public void testDataClient() throws IOException {
- testClientFactory = new DataClient.Factory(new TAsyncClientManager(), new
TBinaryProtocol.Factory());
- getClient();
- putClient();
- }
-
- @Test
- public void testMetaClient() throws IOException {
- testClientFactory = new MetaClient.Factory(new TAsyncClientManager(), new
TBinaryProtocol.Factory());
- getClient();
- putClient();
- }
-
- private void getClient() throws IOException {
- ClientPool clientPool = new ClientPool(testClientFactory);
- for (int i = 0; i < 10; i++) {
- AsyncClient client = clientPool.getClient(TestUtils.getNode(i));
- if (client instanceof TestClient) {
- TestClient testClient = (TestClient) client;
- assertEquals(i, testClient.getSerialNum());
- }
- }
- }
-
- private void putClient() throws IOException {
- ClientPool clientPool = new ClientPool(testClientFactory);
- List<AsyncClient> testClients = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- AsyncClient client = clientPool.getClient(TestUtils.getNode(i));
- testClients.add(client);
- }
- if (testClientFactory instanceof TestClientFactory) {
- for (int i = 0; i < 10; i++) {
- clientPool.putClient(TestUtils.getNode(i), testClients.get(i));
- }
- } else if (testClientFactory instanceof MetaClient.Factory){
- for (AsyncClient testClient : testClients) {
- ((MetaClient) testClient).onComplete();
- }
- } else if (testClientFactory instanceof DataClient.Factory){
- for (AsyncClient testClient : testClients) {
- ((DataClient) testClient).onComplete();
- }
- }
-
- for (int i = 0; i < 10; i++) {
- AsyncClient poolClient = clientPool.getClient(TestUtils.getNode(i));
- assertEquals(testClients.get(i), poolClient);
- }
-
- }
-
-}
\ No newline at end of file
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/LogParserTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/log/LogParserTest.java
deleted file mode 100644
index 7d3b853..0000000
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/LogParserTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.log.log;
-
-import static org.junit.Assert.assertEquals;
-
-import java.nio.ByteBuffer;
-import org.apache.iotdb.cluster.common.TestUtils;
-import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
-import org.apache.iotdb.cluster.log.Log;
-import org.apache.iotdb.cluster.log.LogParser;
-import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
-import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
-import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.junit.Test;
-
-public class LogParserTest {
-
- private LogParser logParser = LogParser.getINSTANCE();
-
- @Test
- public void testAddNodeLog() throws UnknownLogTypeException {
- AddNodeLog log = new AddNodeLog();
- log.setNewNode(TestUtils.getNode(5));
- log.setCurrLogIndex(8);
- log.setCurrLogTerm(8);
- log.setPreviousLogIndex(7);
- log.setPreviousLogTerm(7);
-
- ByteBuffer buffer = log.serialize();
- Log serialized = logParser.parse(buffer);
- assertEquals(log, serialized);
- }
-
- @Test
- public void testPhysicalPlanLog() throws UnknownLogTypeException {
- PhysicalPlanLog log = new PhysicalPlanLog();
- SetStorageGroupPlan setStorageGroupPlan =
- new SetStorageGroupPlan(new Path(TestUtils.getTestSg(5)));
- log.setPlan(setStorageGroupPlan);
- log.setCurrLogIndex(8);
- log.setCurrLogTerm(8);
- log.setPreviousLogIndex(7);
- log.setPreviousLogTerm(7);
-
- ByteBuffer buffer = log.serialize();
- Log serialized = logParser.parse(buffer);
- assertEquals(log, serialized);
- }
-
- @Test
- public void testCloseFileLog() throws UnknownLogTypeException {
- CloseFileLog log = new CloseFileLog(TestUtils.getTestSg(5), false);
- log.setCurrLogIndex(8);
- log.setCurrLogTerm(8);
- log.setPreviousLogIndex(7);
- log.setPreviousLogTerm(7);
-
- ByteBuffer buffer = log.serialize();
- Log serialized = logParser.parse(buffer);
- assertEquals(log, serialized);
- }
-}
\ No newline at end of file
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/applier/DataLogApplierTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/log/applier/DataLogApplierTest.java
deleted file mode 100644
index 8639820..0000000
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/applier/DataLogApplierTest.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.log.log.applier;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import org.apache.iotdb.cluster.common.IoTDBTest;
-import org.apache.iotdb.cluster.common.TestMetaGroupMember;
-import org.apache.iotdb.cluster.common.TestUtils;
-import org.apache.iotdb.cluster.log.LogApplier;
-import org.apache.iotdb.cluster.log.applier.DataLogApplier;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.junit.Test;
-
-public class DataLogApplierTest extends IoTDBTest {
-
- private TestMetaGroupMember testMetaGroupMember = new TestMetaGroupMember() {
- @Override
- public List<MeasurementSchema> pullTimeSeriesSchemas(String prefixPath)
- throws StorageGroupNotSetException {
- if (prefixPath.equals(TestUtils.getTestSg(4))) {
- List<MeasurementSchema> ret = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- ret.add(TestUtils.getTestSchema(4, i));
- }
- return ret;
- } else if (prefixPath.equals(TestUtils.getTestSg(5))) {
- return Collections.emptyList();
- } else {
- throw new StorageGroupNotSetException(prefixPath);
- }
- }
- };
-
- private LogApplier applier = new DataLogApplier(testMetaGroupMember);
-
- @Test
- public void testApplyInsert()
- throws QueryProcessException, IOException,
QueryFilterOptimizationException,
- StorageEngineException, MetadataException, SQLException {
- InsertPlan insertPlan = new InsertPlan();
- PhysicalPlanLog log = new PhysicalPlanLog();
- log.setPlan(insertPlan);
-
- // this series is already created
- insertPlan.setDeviceId(TestUtils.getTestSg(1));
- insertPlan.setTime(1);
- insertPlan.setDataTypes(new TSDataType[]{TSDataType.BOOLEAN});
- insertPlan.setMeasurements(new String[]{TestUtils.getTestMeasurement(0)});
- insertPlan.setValues(new String[]{"1.0"});
- applier.apply(log);
- QueryDataSet dataSet =
query(Collections.singletonList(TestUtils.getTestSeries(1, 0)), null);
- assertTrue(dataSet.hasNext());
- RowRecord record = dataSet.next();
- assertEquals(1, record.getTimestamp());
- assertEquals(1, record.getFields().size());
- assertEquals(1.0, record.getFields().get(0).getDoubleV(), 0.00001);
- assertFalse(dataSet.hasNext());
-
- // this series is not created but can be fetched
- insertPlan.setDeviceId(TestUtils.getTestSg(4));
- applier.apply(log);
- dataSet = query(Collections.singletonList(TestUtils.getTestSeries(4, 0)),
null);
- assertTrue(dataSet.hasNext());
- record = dataSet.next();
- assertEquals(1, record.getTimestamp());
- assertEquals(1, record.getFields().size());
- assertEquals(1.0, record.getFields().get(0).getDoubleV(), 0.00001);
- assertFalse(dataSet.hasNext());
-
- // this series does not exists any where
- insertPlan.setDeviceId(TestUtils.getTestSg(5));
- try {
- applier.apply(log);
- fail("exception should be thrown");
- } catch (QueryProcessException e) {
- assertEquals(
- "org.apache.iotdb.db.exception.metadata.PathNotExistException: Path
[root.test5.s0] does not exist",
- e.getMessage());
- }
-
- // this storage group is not even set
- insertPlan.setDeviceId(TestUtils.getTestSg(6));
- try {
- applier.apply(log);
- fail("exception should be thrown");
- } catch (QueryProcessException e) {
- assertEquals(
- "org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException:
Storage group is not set for current seriesPath: [root.test6]",
- e.getMessage());
- }
- }
-
- @Test
- public void testApplyDeletion()
- throws QueryProcessException, MetadataException,
QueryFilterOptimizationException, StorageEngineException, IOException,
SQLException {
- DeletePlan deletePlan = new DeletePlan();
- deletePlan.setPaths(Collections.singletonList(new
Path(TestUtils.getTestSeries(0, 0))));
- deletePlan.setDeleteTime(50);
- applier.apply(new PhysicalPlanLog(deletePlan));
- QueryDataSet dataSet =
query(Collections.singletonList(TestUtils.getTestSeries(0, 0)), null);
- int cnt = 0;
- while (dataSet.hasNext()) {
- RowRecord record = dataSet.next();
- assertEquals(cnt + 51L, record.getTimestamp());
- assertEquals((cnt + 51) * 1.0, record.getFields().get(0).getDoubleV(),
0.00001);
- cnt++;
- }
- assertEquals(49, cnt);
- }
-
-}
\ No newline at end of file
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/applier/MetaLogApplierTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/log/applier/MetaLogApplierTest.java
deleted file mode 100644
index 8f736a8..0000000
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/applier/MetaLogApplierTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.log.log.applier;
-
-import static junit.framework.TestCase.assertEquals;
-import static junit.framework.TestCase.assertFalse;
-import static junit.framework.TestCase.assertTrue;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.iotdb.cluster.common.IoTDBTest;
-import org.apache.iotdb.cluster.common.TestMetaGroupMember;
-import org.apache.iotdb.cluster.common.TestUtils;
-import org.apache.iotdb.cluster.log.LogApplier;
-import org.apache.iotdb.cluster.log.applier.MetaLogApplier;
-import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
-import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.junit.Test;
-
-public class MetaLogApplierTest extends IoTDBTest {
-
- private Set<Node> nodes = new HashSet<>();
-
- private TestMetaGroupMember testMetaGroupMember = new TestMetaGroupMember() {
- @Override
- public void applyAddNode(Node newNode) {
- nodes.add(newNode);
- }
- };
-
- private LogApplier applier = new MetaLogApplier(testMetaGroupMember);
-
- @Test
- public void testApplyAddNode() throws QueryProcessException {
- nodes.clear();
-
- Node node = new Node("localhost", 1111, 0, 2222);
- AddNodeLog log = new AddNodeLog();
- log.setNewNode(node);
- applier.apply(log);
-
- assertTrue(nodes.contains(node));
- }
-
- @Test
- public void testApplyMetadataCreation() throws QueryProcessException,
MetadataException {
- PhysicalPlanLog physicalPlanLog = new PhysicalPlanLog();
- SetStorageGroupPlan setStorageGroupPlan = new SetStorageGroupPlan(new
Path("root.applyMeta"));
- physicalPlanLog.setPlan(setStorageGroupPlan);
-
- applier.apply(physicalPlanLog);
- assertTrue(MManager.getInstance().pathExist("root.applyMeta"));
-
- CreateTimeSeriesPlan createTimeSeriesPlan = new CreateTimeSeriesPlan(new
Path("root.applyMeta"
- + ".s1"), TSDataType.DOUBLE, TSEncoding.RLE, CompressionType.SNAPPY,
- Collections.emptyMap());
- physicalPlanLog.setPlan(createTimeSeriesPlan);
- applier.apply(physicalPlanLog);
- assertTrue(MManager.getInstance().pathExist("root.applyMeta.s1"));
- assertEquals(TSDataType.DOUBLE,
MManager.getInstance().getSeriesType("root.applyMeta.s1"));
- }
-
- @Test
- public void testApplyCloseFile() throws StorageEngineException,
QueryProcessException {
- StorageGroupProcessor storageGroupProcessor =
- StorageEngine.getInstance().getProcessor(TestUtils.getTestSg(0));
-
assertFalse(storageGroupProcessor.getWorkSequenceTsFileProcessors().isEmpty());
-
- CloseFileLog closeFileLog = new CloseFileLog(TestUtils.getTestSg(0), true);
- applier.apply(closeFileLog);
-
assertTrue(storageGroupProcessor.getWorkSequenceTsFileProcessors().isEmpty());
- }
-}
\ No newline at end of file
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/catchup/LogCatchUpTaskTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/log/catchup/LogCatchUpTaskTest.java
deleted file mode 100644
index 597c330..0000000
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/catchup/LogCatchUpTaskTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.log.log.catchup;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.iotdb.cluster.common.TestClient;
-import org.apache.iotdb.cluster.common.TestLog;
-import org.apache.iotdb.cluster.common.TestMetaGroupMember;
-import org.apache.iotdb.cluster.common.TestUtils;
-import org.apache.iotdb.cluster.log.Log;
-import org.apache.iotdb.cluster.log.catchup.LogCatchUpTask;
-import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
-import org.apache.iotdb.cluster.server.NodeCharacter;
-import org.apache.iotdb.cluster.server.Response;
-import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.junit.Before;
-import org.junit.Test;
-
-public class LogCatchUpTaskTest {
-
- private List<Log> receivedLogs = new ArrayList<>();
- private Node header = new Node();
- private boolean testLeadershipFlag;
-
- private RaftMember sender = new TestMetaGroupMember() {
- @Override
- public AsyncClient connectNode(Node node) {
- return new TestClient() {
- @Override
- public void appendEntry(AppendEntryRequest request,
- AsyncMethodCallback<Long> resultHandler) {
- new Thread(() -> {
- TestLog testLog = new TestLog();
- testLog.deserialize(request.entry);
- receivedLogs.add(testLog);
- if (testLeadershipFlag && testLog.getCurrLogIndex() == 4) {
- sender.setCharacter(NodeCharacter.ELECTOR);
- }
- resultHandler.onComplete(Response.RESPONSE_AGREE);
- }).start();
- }
- };
- }
-
- @Override
- public Node getHeader() {
- return header;
- }
- };
-
- @Before
- public void setUp() {
- testLeadershipFlag = false;
- }
-
- @Test
- public void testCatchUp() {
- List<Log> logList = TestUtils.prepareTestLogs(10);
- Node receiver = new Node();
- sender.setCharacter(NodeCharacter.LEADER);
- LogCatchUpTask task = new LogCatchUpTask(logList, receiver, sender);
- task.run();
-
- assertEquals(logList, receivedLogs);
- }
-
- @Test
- public void testLeadershipLost() {
- testLeadershipFlag = true;
- // the leadership will be lost after sending 5 logs
- List<Log> logList = TestUtils.prepareTestLogs(10);
- Node receiver = new Node();
- sender.setCharacter(NodeCharacter.LEADER);
- LogCatchUpTask task = new LogCatchUpTask(logList, receiver, sender);
- task.run();
-
- assertEquals(logList.subList(0, 5), receivedLogs);
- }
-}
\ No newline at end of file
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/catchup/SnapshotCatchUpTaskTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/log/catchup/SnapshotCatchUpTaskTest.java
deleted file mode 100644
index 34f05a9..0000000
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/catchup/SnapshotCatchUpTaskTest.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.log.log.catchup;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.iotdb.cluster.common.TestClient;
-import org.apache.iotdb.cluster.common.TestLog;
-import org.apache.iotdb.cluster.common.TestMetaGroupMember;
-import org.apache.iotdb.cluster.common.TestSnapshot;
-import org.apache.iotdb.cluster.common.TestUtils;
-import org.apache.iotdb.cluster.log.Log;
-import org.apache.iotdb.cluster.log.Snapshot;
-import org.apache.iotdb.cluster.log.catchup.LogCatchUpTask;
-import org.apache.iotdb.cluster.log.catchup.SnapshotCatchUpTask;
-import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
-import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
-import org.apache.iotdb.cluster.server.NodeCharacter;
-import org.apache.iotdb.cluster.server.Response;
-import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.junit.Before;
-import org.junit.Test;
-
-public class SnapshotCatchUpTaskTest {
-
- private List<Log> receivedLogs = new ArrayList<>();
- private Snapshot receivedSnapshot;
- private Node header = new Node();
- private boolean testLeadershipFlag;
-
- private RaftMember sender = new TestMetaGroupMember() {
- @Override
- public AsyncClient connectNode(Node node) {
- return new TestClient() {
- @Override
- public void appendEntry(AppendEntryRequest request,
- AsyncMethodCallback<Long> resultHandler) {
- new Thread(() -> {
- TestLog testLog = new TestLog();
- testLog.deserialize(request.entry);
- receivedLogs.add(testLog);
- resultHandler.onComplete(Response.RESPONSE_AGREE);
- }).start();
- }
-
- @Override
- public void sendSnapshot(SendSnapshotRequest request,
AsyncMethodCallback resultHandler) {
- new Thread(() -> {
- receivedSnapshot = new TestSnapshot();
- receivedSnapshot.deserialize(request.snapshotBytes);
- if (testLeadershipFlag) {
- sender.setCharacter(NodeCharacter.ELECTOR);
- }
- resultHandler.onComplete(null);
- }).start();
- }
- };
- }
-
- @Override
- public Node getHeader() {
- return header;
- }
- };
-
- @Before
- public void setUp() {
- testLeadershipFlag = false;
- receivedSnapshot = null;
- receivedLogs.clear();
- }
-
- @Test
- public void testCatchUp() {
- List<Log> logList = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- Log log = new TestLog();
- log.setCurrLogIndex(i);
- log.setCurrLogTerm(i);
- log.setPreviousLogIndex(i - 1L);
- log.setPreviousLogTerm(i - 1L);
- logList.add(log);
- }
- Snapshot snapshot = new TestSnapshot(9989);
- Node receiver = new Node();
- sender.setCharacter(NodeCharacter.LEADER);
- SnapshotCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot,
receiver, sender);
- task.run();
-
- assertEquals(logList, receivedLogs);
- assertEquals(snapshot, receivedSnapshot);
- }
-
- @Test
- public void testLeadershipLost() {
- testLeadershipFlag = true;
- // the leadership will be lost after sending the snapshot
- List<Log> logList = TestUtils.prepareTestLogs(10);
- Snapshot snapshot = new TestSnapshot(9989);
- Node receiver = new Node();
- sender.setCharacter(NodeCharacter.LEADER);
- LogCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver,
sender);
- task.run();
-
- assertEquals(snapshot, receivedSnapshot);
- assertTrue(receivedLogs.isEmpty());
- }
-
- @Test
- public void testNoLeadership() {
- // the leadership is lost from the beginning
- List<Log> logList = TestUtils.prepareTestLogs(10);
- Snapshot snapshot = new TestSnapshot(9989);
- Node receiver = new Node();
- sender.setCharacter(NodeCharacter.ELECTOR);
- LogCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver,
sender);
- task.run();
-
- assertNull(receivedSnapshot);
- assertTrue(receivedLogs.isEmpty());
- }
-}
\ No newline at end of file
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/logtypes/SerializeLogTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/log/logtypes/SerializeLogTest.java
deleted file mode 100644
index ddcff42..0000000
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/logtypes/SerializeLogTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.log.log.logtypes;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.nio.ByteBuffer;
-import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
-import org.apache.iotdb.cluster.log.Log;
-import org.apache.iotdb.cluster.log.LogParser;
-import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
-import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.junit.Test;
-
-public class SerializeLogTest {
-
- @Test
- public void testPhysicalPlanLog() throws UnknownLogTypeException {
- PhysicalPlanLog log = new PhysicalPlanLog();
- InsertPlan plan = new InsertPlan();
- plan.setDeviceId("root.d1");
- plan.setMeasurements(new String[]{"s1,s2,s3"});
- plan.setDataTypes(new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64,
TSDataType.TEXT});
- plan.setValues(new String[] {"0.1", "1", "\"dd\""});
- plan.setTime(1);
- log.setPlan(plan);
-
- ByteBuffer byteBuffer = log.serialize();
- Log logPrime = LogParser.getINSTANCE().parse(byteBuffer);
- assertEquals(log, logPrime);
-
- log = new PhysicalPlanLog(new SetStorageGroupPlan(new Path("root.sg1")));
- byteBuffer = log.serialize();
- logPrime = LogParser.getINSTANCE().parse(byteBuffer);
- assertEquals(log, logPrime);
- }
-
- @Test
- public void testAddNodeLog() throws UnknownLogTypeException {
- AddNodeLog log = new AddNodeLog();
- log.setNewNode(new Node("apache.iotdb.com", 1234, 1, 4321));
- ByteBuffer byteBuffer = log.serialize();
- Log logPrime = LogParser.getINSTANCE().parse(byteBuffer);
- assertEquals(log, logPrime);
- }
-
- @Test
- public void testCloseFileLog() throws UnknownLogTypeException {
- CloseFileLog log = new CloseFileLog("root.sg1", true);
- ByteBuffer byteBuffer = log.serialize();
- CloseFileLog logPrime = (CloseFileLog)
LogParser.getINSTANCE().parse(byteBuffer);
- assertTrue(logPrime.isSeq());
- assertEquals("root.sg1", logPrime.getStorageGroupName());
- assertEquals(log, logPrime);
- }
-
-}
\ No newline at end of file
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/manage/FilePartitionedSnapshotLogManagerTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/log/manage/FilePartitionedSnapshotLogManagerTest.java
deleted file mode 100644
index dcd1220..0000000
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/manage/FilePartitionedSnapshotLogManagerTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.log.log.manage;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.iotdb.cluster.RemoteTsFileResource;
-import org.apache.iotdb.cluster.common.IoTDBTest;
-import org.apache.iotdb.cluster.common.TestLogApplier;
-import org.apache.iotdb.cluster.common.TestRemoteFileSnapshot;
-import org.apache.iotdb.cluster.common.TestUtils;
-import org.apache.iotdb.cluster.log.Log;
-import org.apache.iotdb.cluster.log.LogApplier;
-import org.apache.iotdb.cluster.log.manage.FilePartitionedSnapshotLogManager;
-import org.apache.iotdb.cluster.log.snapshot.FileSnapshot;
-import org.apache.iotdb.cluster.log.snapshot.PartitionedSnapshot;
-import org.apache.iotdb.cluster.partition.PartitionTable;
-import org.apache.iotdb.cluster.utils.PartitionUtils;
-import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.junit.Test;
-
-public class FilePartitionedSnapshotLogManagerTest extends IoTDBTest {
-
- @Test
- public void testSnapshot() throws QueryProcessException,
StorageGroupNotSetException {
- PartitionTable partitionTable = TestUtils.getPartitionTable(3);
- LogApplier applier = new TestLogApplier();
- FilePartitionedSnapshotLogManager manager = new
FilePartitionedSnapshotLogManager(applier,
- partitionTable, TestUtils.getNode(0));
-
- List<Log> logs = TestUtils.prepareTestLogs(10);
- for (Log log : logs) {
- manager.appendLog(log);
- }
- manager.commitLog(10);
-
- // create files for sgs
- for (int i = 1; i < 4; i++) {
- String sg = TestUtils.getTestSg(i);
- for (int j = 0; j < 4; j++) {
- // closed files
- prepareData(i, j * 10, 10);
- StorageEngine.getInstance().asyncCloseProcessor(sg, true);
- }
- // un closed files
- prepareData(i, 40, 10);
- }
-
- manager.takeSnapshot();
- PartitionedSnapshot snapshot = (PartitionedSnapshot) manager.getSnapshot();
- for (int i = 1; i < 4; i++) {
- FileSnapshot fileSnapshot =
- (FileSnapshot)
snapshot.getSnapshot(PartitionUtils.calculateStorageGroupSlot(
- TestUtils.getTestSg(i), 0, 100));
- assertEquals(10, fileSnapshot.getTimeseriesSchemas().size());
- assertEquals(5, fileSnapshot.getDataFiles().size());
- }
- }
-
- @Test
- public void testRemoteSnapshots() {
- PartitionTable partitionTable = TestUtils.getPartitionTable(3);
- LogApplier applier = new TestLogApplier();
- FilePartitionedSnapshotLogManager manager = new
FilePartitionedSnapshotLogManager(applier,
- partitionTable, TestUtils.getNode(0));
-
- // fake remote snapshots
- for (int i = 6; i < 9; i++) {
- List<RemoteTsFileResource> resources = new ArrayList<>();
- Set<MeasurementSchema> measurementSchemas = new HashSet<>();
- for (int j = 0; j < 10; j++) {
- RemoteTsFileResource resource = new RemoteTsFileResource();
- resource.setFile(new File(TestUtils.getTestSg(i) + File.separator +
"TsFile" + j));
- resources.add(resource);
- measurementSchemas.add(TestUtils.getTestSchema(i, j));
- }
- manager.setSnapshot(new TestRemoteFileSnapshot(resources,
measurementSchemas), i);
- }
-
- // remote snapshots will be pulled when a new snapshot is taken
- manager.takeSnapshot();
- PartitionedSnapshot snapshot = (PartitionedSnapshot) manager.getSnapshot();
- for (int i = 6; i < 9; i++) {
- FileSnapshot fileSnapshot =
- (FileSnapshot)
snapshot.getSnapshot(PartitionUtils.calculateStorageGroupSlot(TestUtils.getTestSg(i),
0, 100));
- assertEquals(10, fileSnapshot.getDataFiles().size());
- assertEquals(10, fileSnapshot.getTimeseriesSchemas().size());
- }
- }
-}
\ No newline at end of file
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/manage/MemoryLogManagerTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/log/manage/MemoryLogManagerTest.java
deleted file mode 100644
index 50c047c..0000000
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/manage/MemoryLogManagerTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.log.log.manage;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.iotdb.cluster.common.TestLog;
-import org.apache.iotdb.cluster.common.TestLogApplier;
-import org.apache.iotdb.cluster.common.TestUtils;
-import org.apache.iotdb.cluster.log.Log;
-import org.apache.iotdb.cluster.log.LogApplier;
-import org.apache.iotdb.cluster.log.Snapshot;
-import org.apache.iotdb.cluster.log.manage.MemoryLogManager;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.junit.Before;
-import org.junit.Test;
-
-public class MemoryLogManagerTest {
-
- private MemoryLogManager logManager;
- private Set<Log> appliedLogs;
- private LogApplier logApplier = new TestLogApplier() {
- @Override
- public void apply(Log log) {
- appliedLogs.add(log);
- }
- };
-
- @Before
- public void setUp() {
- appliedLogs = new HashSet<>();
- logManager = new MemoryLogManager(logApplier) {
- @Override
- public Snapshot getSnapshot() {
- return null;
- }
-
- @Override
- public void takeSnapshot() {
- // faked class
- }
- };
- }
-
- @Test
- public void testAppend() {
- List<Log> testLogs = TestUtils.prepareTestLogs(10);
- assertEquals(-1, logManager.getLastLogIndex());
- assertEquals(-1, logManager.getLastLogTerm());
- assertNull(logManager.getLastLog());
- assertFalse(logManager.logValid(5));
-
- for (Log testLog : testLogs) {
- logManager.appendLog(testLog);
- }
- assertEquals(9, logManager.getLastLogIndex());
- assertEquals(9, logManager.getLastLogTerm());
- assertEquals(testLogs.get(9), logManager.getLastLog());
- assertEquals(testLogs.subList(3, 7), logManager.getLogs(3, 7));
- assertTrue(logManager.logValid(5));
- }
-
- @Test
- public void testCommit() throws QueryProcessException {
- List<Log> testLogs = TestUtils.prepareTestLogs(10);
- assertEquals(-1, logManager.getCommitLogIndex());
- for (Log testLog : testLogs) {
- logManager.appendLog(testLog);
- }
- assertEquals(-1, logManager.getCommitLogIndex());
- logManager.commitLog(8);
- assertEquals(8, logManager.getCommitLogIndex());
- assertTrue(appliedLogs.containsAll(testLogs.subList(0, 9)));
-
- logManager.commitLog(testLogs.get(9));
- assertEquals(9, logManager.getCommitLogIndex());
- assertTrue(appliedLogs.containsAll(testLogs));
-
- logManager.commitLog(1);
- assertEquals(9, logManager.getCommitLogIndex());
- assertTrue(appliedLogs.containsAll(testLogs));
- }
-
- @Test
- public void testSet() {
- assertEquals(-1, logManager.getLastLogIndex());
- assertEquals(-1, logManager.getLastLogTerm());
- logManager.setLastLogId(9);
- logManager.setLastLogTerm(9);
- assertEquals(9, logManager.getLastLogIndex());
- assertEquals(9, logManager.getLastLogTerm());
-
- assertSame(logApplier, logManager.getApplier());
- assertEquals(Collections.emptyList(), logManager.getLogs(100, 2000));
- assertEquals(Collections.emptyList(), logManager.getLogs(2000, 100));
- }
-
- @Test
- public void testRemove() {
- List<Log> testLogs = TestUtils.prepareTestLogs(10);
- for (Log testLog : testLogs) {
- logManager.appendLog(testLog);
- }
- for (int i = 0; i < 3; i++) {
- logManager.removeLastLog();
- }
- assertEquals(testLogs.subList(0, 7), logManager.getLogs(0, 7));
- }
-
- @Test
- public void testReplace() {
- List<Log> testLogs = TestUtils.prepareTestLogs(10);
- for (Log testLog : testLogs) {
- logManager.appendLog(testLog);
- }
-
- Log log = new TestLog();
- log.setPreviousLogTerm(8);
- log.setPreviousLogIndex(8);
- log.setCurrLogTerm(100);
- log.setCurrLogIndex(9);
- assertNotEquals(log, logManager.getLastLog());
- logManager.replaceLastLog(log);
- assertEquals(log, logManager.getLastLog());
- }
-}
\ No newline at end of file
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/manage/MetaSingleSnapshotLogManagerTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/log/manage/MetaSingleSnapshotLogManagerTest.java
deleted file mode 100644
index 94357f7..0000000
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/manage/MetaSingleSnapshotLogManagerTest.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.log.log.manage;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.List;
-import org.apache.iotdb.cluster.common.IoTDBTest;
-import org.apache.iotdb.cluster.common.TestLogApplier;
-import org.apache.iotdb.cluster.common.TestUtils;
-import org.apache.iotdb.cluster.log.Log;
-import org.apache.iotdb.cluster.log.manage.MetaSingleSnapshotLogManager;
-import org.apache.iotdb.cluster.log.snapshot.MetaSimpleSnapshot;
-import org.apache.iotdb.cluster.log.snapshot.SimpleSnapshot;
-import org.apache.iotdb.db.exception.StartupException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.junit.Before;
-import org.junit.Test;
-
-public class MetaSingleSnapshotLogManagerTest extends IoTDBTest {
-
- private MetaSingleSnapshotLogManager logManager =
- new MetaSingleSnapshotLogManager(new TestLogApplier());
-
- @Override
- @Before
- public void setUp() throws QueryProcessException, StartupException {
- super.setUp();
- logManager =
- new MetaSingleSnapshotLogManager(new TestLogApplier());
- }
-
- @Test
- public void testTakeSnapshot() {
- List<Log> testLogs = TestUtils.prepareTestLogs(10);
- for (Log testLog : testLogs) {
- logManager.appendLog(testLog);
- }
- logManager.commitLog(4);
-
- logManager.takeSnapshot();
- MetaSimpleSnapshot snapshot = (MetaSimpleSnapshot)
logManager.getSnapshot();
- List<String> storageGroups = snapshot.getStorageGroups();
- assertEquals(10, storageGroups.size());
- for (int i = 0; i < 10; i++) {
- assertEquals(TestUtils.getTestSg(i), storageGroups.get(i));
- }
- assertEquals(testLogs.subList(0, 5), snapshot.getSnapshot());
- assertEquals(4, snapshot.getLastLogId());
- assertEquals(4, snapshot.getLastLogTerm());
- }
-
- @Test
- public void testSetSnapshot() {
- List<Log> testLogs = TestUtils.prepareTestLogs(10);
- SimpleSnapshot simpleSnapshot = new SimpleSnapshot(testLogs);
- logManager.setSnapshot(simpleSnapshot);
-
- MetaSimpleSnapshot snapshot = (MetaSimpleSnapshot)
logManager.getSnapshot();
- assertEquals(testLogs, snapshot.getSnapshot());
- }
-
-}
\ No newline at end of file
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/snapshot/PullSnapshotTaskTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/log/snapshot/PullSnapshotTaskTest.java
deleted file mode 100644
index 55196a3..0000000
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/snapshot/PullSnapshotTaskTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.log.log.snapshot;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.iotdb.cluster.client.ClientPool;
-import org.apache.iotdb.cluster.client.DataClient;
-import org.apache.iotdb.cluster.common.TestSnapshot;
-import org.apache.iotdb.cluster.common.TestUtils;
-import org.apache.iotdb.cluster.log.Snapshot;
-import org.apache.iotdb.cluster.log.snapshot.PullSnapshotTask;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
-import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
-import org.apache.iotdb.cluster.server.member.DataGroupMember;
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.async.TAsyncClientManager;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.junit.Before;
-import org.junit.Test;
-
-public class PullSnapshotTaskTest {
-
- private ClientPool clientPool = new ClientPool(null);
- private Map<Integer, Snapshot> snapshotMap = new HashMap<>();
- private DataGroupMember newMember = new DataGroupMember() {
- @Override
- public AsyncClient connectNode(Node node) {
- try {
- return new DataClient(new TBinaryProtocol.Factory(), new
TAsyncClientManager(),
- node, clientPool) {
- @Override
- public void pullSnapshot(PullSnapshotRequest request,
- AsyncMethodCallback<PullSnapshotResp> resultHandler) {
- new Thread(() -> {
- PullSnapshotResp resp = new PullSnapshotResp();
- Map<Integer, ByteBuffer> snapshotBytes = new HashMap<>();
- for (Entry<Integer, Snapshot> entry : snapshotMap.entrySet()) {
- snapshotBytes.put(entry.getKey(),
entry.getValue().serialize());
- }
- resp.setSnapshotBytes(snapshotBytes);
- resultHandler.onComplete(resp);
- }).start();
- }
- };
- } catch (IOException e) {
- return null;
- }
- }
- };
-
- @Before
- public void setUp() {
- for (int i = 0; i < 10; i++) {
- snapshotMap.put(i, new TestSnapshot(i));
- }
- }
-
- @Test
- public void test() {
- List<Integer> slots = new ArrayList<>();
- List<Node> previousHolders = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- slots.add(i);
- previousHolders.add(TestUtils.getNode(i + 1));
- }
-
- PullSnapshotTask<TestSnapshot> task = new
PullSnapshotTask<TestSnapshot>(TestUtils.getNode(0), slots,
- newMember, previousHolders, TestSnapshot::new, false);
- Map<Integer, TestSnapshot> result = task.call();
- assertEquals(snapshotMap, result);
- }
-}
\ No newline at end of file
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/snapshot/SnapshotSerializeTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/log/snapshot/SnapshotSerializeTest.java
deleted file mode 100644
index fd840ca..0000000
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/snapshot/SnapshotSerializeTest.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.log.log.snapshot;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.iotdb.cluster.common.TestSnapshot;
-import org.apache.iotdb.cluster.common.TestUtils;
-import org.apache.iotdb.cluster.log.Log;
-import org.apache.iotdb.cluster.log.Snapshot;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
-import org.apache.iotdb.cluster.log.snapshot.FileSnapshot;
-import org.apache.iotdb.cluster.log.snapshot.MetaSimpleSnapshot;
-import org.apache.iotdb.cluster.log.snapshot.PartitionedSnapshot;
-import org.apache.iotdb.cluster.log.snapshot.RemoteFileSnapshot;
-import org.apache.iotdb.cluster.log.snapshot.SimpleSnapshot;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.junit.Test;
-
-public class SnapshotSerializeTest {
-
- @Test
- public void testFileSnapshot() {
- Set<MeasurementSchema> measurementSchemaList = new HashSet<>();
- FileSnapshot snapshot = new FileSnapshot();
- for (int i = 0; i < 10; i++) {
- measurementSchemaList.add(TestUtils.getTestSchema(i, i));
- TsFileResource tsFileResource = new TsFileResource(new File("TsFile" +
i));
- tsFileResource.setHistoricalVersions(Collections.singleton((long) i));
- snapshot.addFile(tsFileResource, TestUtils.getNode(i));
- }
- snapshot.setTimeseriesSchemas(measurementSchemaList);
-
- ByteBuffer byteBuffer = snapshot.serialize();
- FileSnapshot deserializedSnapshot = new FileSnapshot();
- deserializedSnapshot.deserialize(byteBuffer);
- assertEquals(snapshot, deserializedSnapshot);
- }
-
- @Test
- public void testMetaSimpleSnapshot() {
- List<Log> logs = new ArrayList<>();
- List<String> sgs = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- MeasurementSchema schema = TestUtils.getTestSchema(i, 0);
- CreateTimeSeriesPlan createTimeSeriesPlan =
- new CreateTimeSeriesPlan(new Path(schema.getMeasurementId()),
schema.getType(),
- schema.getEncodingType(), schema.getCompressor(),
schema.getProps());
- PhysicalPlanLog log = new PhysicalPlanLog();
- log.setPlan(createTimeSeriesPlan);
- logs.add(log);
- log.setPreviousLogTerm(i - 1);
- log.setPreviousLogIndex(i - 1);
- log.setCurrLogIndex(i);
- log.setCurrLogTerm(i);
- sgs.add(TestUtils.getTestSg(i));
- }
- MetaSimpleSnapshot snapshot = new MetaSimpleSnapshot(logs, sgs);
- snapshot.setLastLogId(10);
- snapshot.setLastLogTerm(10);
-
- ByteBuffer byteBuffer = snapshot.serialize();
- MetaSimpleSnapshot deserializedSnapshot = new MetaSimpleSnapshot();
- deserializedSnapshot.deserialize(byteBuffer);
- assertEquals(snapshot, deserializedSnapshot);
- }
-
- @Test
- public void testPartitionedSnapshot() {
- PartitionedSnapshot<TestSnapshot> snapshot = new
PartitionedSnapshot<>(TestSnapshot::new);
- snapshot.setLastLogTerm(10);
- snapshot.setLastLogId(10);
-
- Map<Integer, Snapshot> slotSnapshots = new HashMap<>();
- List<Integer> slots = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- TestSnapshot s = new TestSnapshot();
- slotSnapshots.put(i, s);
- snapshot.putSnapshot(i, s);
- slots.add(i);
- }
-
- for (Integer slot : slots) {
- TestSnapshot s = (TestSnapshot) snapshot.getSnapshot(slot);
- assertTrue(slotSnapshots.values().contains(s));
- }
-
- PartitionedSnapshot subSnapshots = snapshot.getSubSnapshots(slots);
- for (Integer slot : slots) {
- TestSnapshot s = (TestSnapshot) subSnapshots.getSnapshot(slot);
- assertTrue(slotSnapshots.values().contains(s));
- }
-
- ByteBuffer byteBuffer = snapshot.serialize();
- PartitionedSnapshot deserializedSnapshot = new
PartitionedSnapshot(TestSnapshot::new);
- deserializedSnapshot.deserialize(byteBuffer);
- assertEquals(snapshot, deserializedSnapshot);
- }
-
-
- @Test
- public void testRemoteFileSnapshot() {
- AtomicBoolean remoteSnapshotGet = new AtomicBoolean(false);
- RemoteFileSnapshot snapshot = new RemoteFileSnapshot(
- new Future<Void>() {
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return false;
- }
-
- @Override
- public boolean isCancelled() {
- return false;
- }
-
- @Override
- public boolean isDone() {
- return false;
- }
-
- @Override
- public Void get() {
- remoteSnapshotGet.set(true);
- return null;
- }
-
- @Override
- public Void get(long timeout, TimeUnit unit) {
- remoteSnapshotGet.set(true);
- return null;
- }
- });
-
- assertNull(snapshot.serialize());
- snapshot.getRemoteSnapshot();
- assertTrue(remoteSnapshotGet.get());
- }
-
- @Test
- public void testSimpleSnapshot() {
- List<Log> logs = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- MeasurementSchema schema = TestUtils.getTestSchema(i, 0);
- CreateTimeSeriesPlan createTimeSeriesPlan =
- new CreateTimeSeriesPlan(new Path(schema.getMeasurementId()),
schema.getType(),
- schema.getEncodingType(), schema.getCompressor(),
schema.getProps());
- PhysicalPlanLog log = new PhysicalPlanLog();
- log.setPlan(createTimeSeriesPlan);
- logs.add(log);
- log.setPreviousLogTerm(i - 1);
- log.setPreviousLogIndex(i - 1);
- log.setCurrLogIndex(i);
- log.setCurrLogTerm(i);
- }
- SimpleSnapshot snapshot = new SimpleSnapshot(new
ArrayList<>(logs.subList(0, 5)));
- snapshot.setLastLogId(10);
- snapshot.setLastLogTerm(10);
- for (Log log : logs.subList(5, logs.size())) {
- snapshot.add(log);
- }
-
- ByteBuffer byteBuffer = snapshot.serialize();
- SimpleSnapshot deserializedSnapshot = new SimpleSnapshot();
- deserializedSnapshot.deserialize(byteBuffer);
- assertEquals(snapshot, deserializedSnapshot);
- }
-}
\ No newline at end of file
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/partition/SlotPartitionTableTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/partition/SlotPartitionTableTest.java
deleted file mode 100644
index 9bc08cf..0000000
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/partition/SlotPartitionTableTest.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.cluster.log.partition;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.IntStream;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.partition.NodeRemovalResult;
-import org.apache.iotdb.cluster.partition.PartitionGroup;
-import org.apache.iotdb.cluster.partition.SlotPartitionTable;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class SlotPartitionTableTest {
-
- SlotPartitionTable table;
- int replica_size = 5;
-
- @Before
- public void setUp() {
- List<Node> nodes = new ArrayList<>();
- IntStream.range(0, 20).forEach(i -> nodes.add(getNode(i)));
-
ClusterDescriptor.getINSTANCE().getConfig().setReplicationNum(replica_size);
- table = new SlotPartitionTable(nodes, nodes.get(3));
- }
-
- private Node getNode(int i) {
- return new Node("localhost", 30000 + i, i, 40000 + i);
- }
-
- @After
- public void tearDown() {
- ClusterDescriptor.getINSTANCE().getConfig().setReplicationNum(3);
- }
-
- @Test
- public void getHeaderGroup() {
- Arrays.stream(new int[]{10, 15, 19}).forEach( i -> {
- int last = (i + replica_size - 1) % 20;
- assertGetHeaderGroup(i, last);
- });
- }
-
- private void assertGetHeaderGroup(int start, int last) {
- PartitionGroup group = table.getHeaderGroup(new Node("localhost", 30000 +
start, start, 40000 + start));
- assertEquals(replica_size, group.size());
- assertEquals(new Node("localhost", 30000 + start, start, 40000 + start),
group.getHeader());
- assertEquals(
- new Node("localhost", 30000 + last, last, 40000 + last),
- group.get(replica_size - 1));
- }
-
- @Test
- public void route() {
- table.route("root.sg1", 1);
- }
-
- @Test
- public void addNode() {
- String a = "中国";
- System.out.println(a.length());
- }
-
- @Test
- public void getLocalGroups() {
- }
-
- @Test
- public void serialize() {
- }
-
- @Test
- public void deserialize() {
- }
-
- @Test
- public void getAllNodes() {
- }
-
- @Test
- public void getPreviousNodeMap() {
- }
-
- @Test
- public void getNodeSlots() {
- }
-
- @Test
- public void getAllNodeSlots() {
- }
-
- @Test
- public void testRemoveNode() {
- List<Integer> nodeSlots = table.getNodeSlots(getNode(0));
- NodeRemovalResult nodeRemovalResult = table.removeNode(getNode(0));
- assertFalse(table.getAllNodes().contains(getNode(0)));
- PartitionGroup removedGroup = nodeRemovalResult.getRemovedGroup();
- for (int i = 0; i < 5; i++) {
- assertTrue(removedGroup.contains(getNode(i)));
- }
- PartitionGroup newGroup = nodeRemovalResult.getNewGroup();
- for (int i : new int[] {18, 19, 1, 2, 3}) {
- assertTrue(newGroup.contains(getNode(i)));
- }
- // the slots owned by the removed one should be redistributed to other
nodes
- Map<Node, List<Integer>> newSlotOwners =
nodeRemovalResult.getNewSlotOwners();
- for (List<Integer> slots : newSlotOwners.values()) {
- assertTrue(nodeSlots.containsAll(slots));
- nodeSlots.removeAll(slots);
- }
- assertTrue(nodeSlots.isEmpty());
- }
-}
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/query/manage/ClusterQueryManagerTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/query/manage/ClusterQueryManagerTest.java
deleted file mode 100644
index 125d66f..0000000
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/query/manage/ClusterQueryManagerTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.log.query.manage;
-
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-
-import org.apache.iotdb.cluster.common.TestUtils;
-import org.apache.iotdb.cluster.query.RemoteQueryContext;
-import org.apache.iotdb.cluster.query.manage.ClusterQueryManager;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
-import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.reader.IBatchReader;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ClusterQueryManagerTest {
-
- private ClusterQueryManager queryManager;
-
- @Before
- public void setUp() {
- queryManager = new ClusterQueryManager();
- }
-
- @Test
- public void testContext() {
- RemoteQueryContext queryContext1 =
queryManager.getQueryContext(TestUtils.getNode(0), 1);
- RemoteQueryContext queryContext2 =
queryManager.getQueryContext(TestUtils.getNode(0), 1);
- RemoteQueryContext queryContext3 =
queryManager.getQueryContext(TestUtils.getNode(1), 1);
- assertSame(queryContext1, queryContext2);
- assertNotEquals(queryContext2, queryContext3);
- }
-
- @Test
- public void testRegisterReader() {
- IBatchReader reader = new IBatchReader() {
- @Override
- public boolean hasNextBatch() {
- return false;
- }
-
- @Override
- public BatchData nextBatch() {
- return null;
- }
-
- @Override
- public void close() {
-
- }
- };
- long id = queryManager.registerReader(reader);
- assertSame(reader, queryManager.getReader(id));
- }
-
- @Test
- public void testRegisterReaderByTime() {
- IReaderByTimestamp reader = new IReaderByTimestamp() {
- @Override
- public Object getValueInTimestamp(long timestamp) {
- return null;
- }
-
- @Override
- public boolean hasNext() {
- return false;
- }
- };
- long id = queryManager.registerReaderByTime(reader);
- assertSame(reader, queryManager.getReaderByTimestamp(id));
- }
-
- @Test
- public void testEndQuery() throws StorageEngineException {
- RemoteQueryContext queryContext =
queryManager.getQueryContext(TestUtils.getNode(0), 1);
- for (int i = 0; i < 10; i++) {
- IBatchReader reader = new IBatchReader() {
- @Override
- public boolean hasNextBatch() {
- return false;
- }
-
- @Override
- public BatchData nextBatch() {
- return null;
- }
-
- @Override
- public void close() {
-
- }
- };
- queryContext.registerLocalReader(queryManager.registerReader(reader));
- }
- queryManager.endQuery(TestUtils.getNode(0), 1);
- for (int i = 0; i < 10; i++) {
- assertNull(queryManager.getReader(i));
- }
- }
-}
\ No newline at end of file
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/query/manage/QueryCoordinatorTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/query/manage/QueryCoordinatorTest.java
deleted file mode 100644
index fad590b..0000000
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/query/manage/QueryCoordinatorTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.log.query.manage;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.iotdb.cluster.common.TestMetaClient;
-import org.apache.iotdb.cluster.common.TestUtils;
-import org.apache.iotdb.cluster.query.manage.NodeStatus;
-import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
-import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
-import org.apache.iotdb.cluster.server.member.MetaGroupMember;
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.async.TAsyncClientManager;
-import org.apache.thrift.protocol.TBinaryProtocol.Factory;
-import org.junit.Before;
-import org.junit.Test;
-
-public class QueryCoordinatorTest {
-
- private MetaGroupMember metaGroupMember;
- private TestMetaClient metaClient;
- private Map<Node, NodeStatus> nodeStatusMap;
- private Map<Node, Long> nodeLatencyMap;
- private QueryCoordinator coordinator = QueryCoordinator.getINSTANCE();
-
- @Before
- public void setUp() throws IOException {
- nodeStatusMap = new HashMap<>();
- nodeLatencyMap = new HashMap<>();
- for (int i = 0; i < 10; i++) {
- NodeStatus status = new NodeStatus();
- TNodeStatus nodeStatus = new TNodeStatus();
- status.setStatus(nodeStatus);
- status.setLastResponseLatency(i);
- Node node = TestUtils.getNode(i);
- nodeStatusMap.put(node, status);
- // nodes with smaller num has lower latency
- nodeLatencyMap.put(node, i * 10L);
- }
- metaClient = new TestMetaClient(new Factory(), new TAsyncClientManager(),
TestUtils.getNode(0),
- null) {
- @Override
- public void queryNodeStatus(AsyncMethodCallback<TNodeStatus>
resultHandler) {
- new Thread(() -> {
- try {
- Thread.sleep(nodeLatencyMap.get(getNode()));
- } catch (InterruptedException e) {
- // ignored
- }
- resultHandler.onComplete(nodeStatusMap.get(getNode()).getStatus());
- }).start();
- }
- };
-
- metaGroupMember = new MetaGroupMember() {
- @Override
- public AsyncClient connectNode(Node node) {
- metaClient.setNode(node);
- return metaClient;
- }
- };
- coordinator.setMetaGroupMember(metaGroupMember);
- }
-
- @Test
- public void test() {
- List<Node> orderedNodes = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- orderedNodes.add(TestUtils.getNode(i));
- }
- List<Node> unorderedNodes = new ArrayList<>(orderedNodes);
- Collections.shuffle(unorderedNodes);
-
- List<Node> reorderedNodes = coordinator.reorderNodes(unorderedNodes);
- assertEquals(orderedNodes, reorderedNodes);
- }
-}
\ No newline at end of file
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
index 696431c..a32a86a 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
@@ -25,10 +25,8 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
-import java.lang.reflect.Array;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.ArrayList;
@@ -43,13 +41,10 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.utils.PartitionUtils;
-import org.apache.iotdb.cluster.utils.nodetool.function.Partition;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
-import org.apache.iotdb.db.exception.storageGroup.StorageGroupException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType;
@@ -85,7 +80,6 @@ import org.apache.iotdb.tsfile.read.common.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -104,7 +98,7 @@ public class SlotPartitionTableTest {
public void setUp() throws MetadataException {
MManager.getInstance().init();
nodes = new ArrayList<>();
- IntStream.range(0, 20).forEach(i -> nodes.add(new Node("localhost", 30000
+ i, i, 40000 + i)));
+ IntStream.range(0, 20).forEach(i -> nodes.add(getNode(i)));
localNode = nodes.get(3);
ClusterDescriptor.getINSTANCE().getConfig().setReplicationNum(replica_size);
tables = new SlotPartitionTable[20];
@@ -162,11 +156,14 @@ public class SlotPartitionTableTest {
manager.clear();
}
}
- for (File file : new File("target/schemas").listFiles()) {
- try {
- Files.delete(file.toPath());
- } catch (IOException e) {
- logger.error("{} can not be deleted.", file, e);
+ File[] files = new File("target/schemas").listFiles();
+ if (files != null) {
+ for (File file : files) {
+ try {
+ Files.delete(file.toPath());
+ } catch (IOException e) {
+ logger.error("{} can not be deleted.", file, e);
+ }
}
}
}
@@ -262,7 +259,7 @@ public class SlotPartitionTableTest {
ByteBuffer buffer = localTable.serialize();
SlotPartitionTable tmpTable = new SlotPartitionTable(new Node());
tmpTable.deserialize(buffer);
- assertTrue(localTable.equals(tmpTable));
+ assertEquals(localTable, tmpTable);
}
@Test
@@ -293,10 +290,6 @@ public class SlotPartitionTableTest {
}
@Test
- public void getMManager() {
- //no meanningful
- }
-
public void testPhysicalPlan() {
PhysicalPlan aggregationPlan = new AggregationPlan();
assertTrue(PartitionUtils.isLocalPlan(aggregationPlan));
@@ -313,7 +306,7 @@ public class SlotPartitionTableTest {
localTable.routePlan(updatePlan);
} catch (UnsupportedPlanException e) {
//success
- } catch (StorageGroupNotSetException | IllegalPathException e) {
+ } catch (StorageGroupNotSetException e) {
fail(e.getMessage());
}
try {
@@ -323,8 +316,6 @@ public class SlotPartitionTableTest {
e.printStackTrace();
fail(e.getMessage());
}
- PhysicalPlan dataAuthPlan = new DataAuthPlan(OperatorType.AUTHOR,
Collections.emptyList());
- assertTrue(PartitionUtils.isGlobalPlan(dataAuthPlan));
PhysicalPlan deleteStorageGroup = new
DeleteStorageGroupPlan(Collections.emptyList());
assertTrue(PartitionUtils.isGlobalPlan(deleteStorageGroup));
PhysicalPlan loadConfigPlan = new LoadConfigurationPlan();
@@ -336,7 +327,7 @@ public class SlotPartitionTableTest {
localTable.routePlan(propertyPlan);
} catch (UnsupportedPlanException e) {
//success
- } catch (StorageGroupNotSetException | IllegalPathException e) {
+ } catch (StorageGroupNotSetException e) {
fail(e.getMessage());
}
PhysicalPlan setStorageGroupPlan = new SetStorageGroupPlan();
@@ -514,4 +505,29 @@ public class SlotPartitionTableTest {
}
}
+ private Node getNode(int i) {
+ return new Node("localhost", 30000 + i, i, 40000 + i);
+ }
+
+ @Test
+ public void testRemoveNode() {
+ List<Integer> nodeSlots = localTable.getNodeSlots(getNode(0));
+ NodeRemovalResult nodeRemovalResult = localTable.removeNode(getNode(0));
+ assertFalse(localTable.getAllNodes().contains(getNode(0)));
+ PartitionGroup removedGroup = nodeRemovalResult.getRemovedGroup();
+ for (int i = 0; i < 5; i++) {
+ assertTrue(removedGroup.contains(getNode(i)));
+ }
+ PartitionGroup newGroup = nodeRemovalResult.getNewGroup();
+ for (int i : new int[] {18, 19, 1, 2, 3}) {
+ assertTrue(newGroup.contains(getNode(i)));
+ }
+ // the slots owned by the removed one should be redistributed to other
nodes
+ Map<Node, List<Integer>> newSlotOwners =
nodeRemovalResult.getNewSlotOwners();
+ for (List<Integer> slots : newSlotOwners.values()) {
+ assertTrue(nodeSlots.containsAll(slots));
+ nodeSlots.removeAll(slots);
+ }
+ assertTrue(nodeSlots.isEmpty());
+ }
}
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index f96af5c..767ea81 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -103,12 +103,11 @@ import org.apache.iotdb.tsfile.read.filter.ValueFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.protocol.TCompactProtocol.Factory;
import org.apache.thrift.transport.TTransportException;
-import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
public class MetaGroupMemberTest extends MemberTest {