This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster by this push:
new 229fd03 modify some impl of query timeseries
229fd03 is described below
commit 229fd030b37a2d883c0f6c86669865265e919c92
Author: lta <[email protected]>
AuthorDate: Sat Mar 30 19:01:54 2019 +0800
modify some impl of query timeseries
---
.../org/apache/iotdb/cluster/callback/Task.java | 4 ++
.../apache/iotdb/cluster/config/ClusterConfig.java | 45 +++++++++++++++-------
.../apache/iotdb/cluster/qp/ClusterQPExecutor.java | 13 ++++++-
.../cluster/qp/executor/NonQueryExecutor.java | 2 +-
.../cluster/qp/executor/QueryMetadataExecutor.java | 19 +++++++--
.../rpc/request/QueryTimeSeriesRequest.java | 1 -
.../cluster/rpc/service/TSServiceClusterImpl.java | 3 +-
.../org/apache/iotdb/cluster/utils/RaftUtils.java | 28 +++++++++-----
.../apache/iotdb/cluster/utils/hash/Router.java | 13 ++++++-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 6 +--
10 files changed, 95 insertions(+), 39 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
b/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
index 0c73a01..7940a5f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
@@ -33,6 +33,7 @@ public abstract class Task {
* Task request
*/
protected BasicRequest request;
+
/**
* Whether it's a synchronization task or not.
*/
@@ -112,6 +113,9 @@ public abstract class Task {
INITIAL, REDIRECT, FINISH, EXCEPTION
}
+ /**
+ * Wait until task is finished.
+ */
public void await() throws InterruptedException {
this.taskCountDownLatch.await();
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 67e176b..83b129f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -38,40 +38,57 @@ public class ClusterConfig {
private static final String DEFAULT_RAFT_LOG_DIR = "log";
private static final String DEFAULT_RAFT_SNAPSHOT_DIR = "snapshot";
-
- // Cluster node: {ip1,ip2,...,ipn}
+ /**
+ * Cluster node: {ip1,ip2,...,ipn}
+ */
private String[] nodes = {DEFAULT_NODE};
- // Replication number
+ /**
+ * Replication number
+ */
private int replication = 3;
- private String ip = null;
+ private String ip = "127.0.0.1";
+
private int port = 8888;
- // Path for holder to store raft log
+ /**
+ * Path for holder to store raft log
+ */
private String raftLogPath;
- // Path for holder to store raft snapshot
+ /**
+ * Path for holder to store raft snapshot
+ */
private String raftSnapshotPath;
- // Path for holder to store raft metadata
+ /**
+ * Path for holder to store raft metadata
+ */
private String raftMetadataPath;
- // When the number of the difference between
- // leader and follower log is less than this value, it is considered as
'catch-up'
+ /**
+ * When the number of the difference between leader and follower log is less
than this value, it
+ * is considered as 'catch-up'
+ */
private int maxCatchUpLogNum = 100000;
- // Whether to enable the delayed snapshot mechanism or not
+ /**
+ * Whether to enable the delayed snapshot mechanism or not
+ */
private boolean delaySnapshot = false;
- // Maximum allowed delay hours
+
+ /**
+ * Maximin allowed delay hours
+ */
private int delayHours = 24;
/**
- * count limit to redo a single task
+ * Count limit to redo a single task
**/
private int taskRedoCount = 3;
/**
- * timeout limit for a single task, the unit is milliseconds
+ * Timeout limit for a single task, the unit is milliseconds
**/
private int taskTimeoutMs = 1000;
@@ -81,7 +98,7 @@ public class ClusterConfig {
// empty constructor
}
- public void updatePath(){
+ public void updatePath() {
IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
String iotdbDataDir = conf.getDataDir();
iotdbDataDir = FilePathUtils.regularizePath(iotdbDataDir);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
index f100f1d..ab9f153 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
@@ -46,6 +46,7 @@ public abstract class ClusterQPExecutor {
CLUSTER_CONFIG.getPort());
protected OverflowQPExecutor qpExecutor = new OverflowQPExecutor();
protected MManager mManager = MManager.getInstance();
+
/**
* Rpc Service Client
*/
@@ -55,6 +56,7 @@ public abstract class ClusterQPExecutor {
* Count limit to redo a single task
*/
protected static final int TASK_MAX_RETRY =
CLUSTER_CONFIG.getTaskRedoCount();
+
/**
* Number of subtask in task segmentation
*/
@@ -81,10 +83,10 @@ public abstract class ClusterQPExecutor {
}
/**
- * Verify if the command can execute in local. 1. If this node belongs to
the storage group 2. If
+ * Verify if the non query command can execute in local. 1. If this node
belongs to the storage group 2. If
* this node is leader.
*/
- public boolean canHandle(String storageGroup) {
+ public boolean canHandleNonQuery(String storageGroup) {
if (router.containPhysicalNode(storageGroup, localNode)) {
String groupId = getGroupIdBySG(storageGroup);
if
(RaftUtils.convertPeerId(RaftUtils.getTargetPeerID(groupId)).equals(localNode))
{
@@ -95,6 +97,13 @@ public abstract class ClusterQPExecutor {
}
/**
+ * Verify if the query command can execute in local. Check if this node
belongs to the storage group
+ */
+ public boolean canHandleQuery(String storageGroup) {
+ return router.containPhysicalNode(storageGroup, localNode);
+ }
+
+ /**
* Async handle task by task and leader id
*
* @param task request task
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index 6bc92f5..a95a227 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -182,7 +182,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
private boolean handleRequest(String storageGroup, PhysicalPlan plan)
throws ProcessorException, IOException, RaftConnectionException,
InterruptedException {
/** Check if the plan can be executed locally. **/
- if (canHandle(storageGroup)) {
+ if (canHandleNonQuery(storageGroup)) {
return qpExecutor.processNonQuery(plan);
} else {
String groupId = getGroupIdBySG(storageGroup);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 71c88d1..4560973 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -75,11 +75,11 @@ public class QueryMetadataExecutor extends
ClusterQPExecutor {
String storageGroup = getStroageGroupByDevice(path);
String groupId = getGroupIdBySG(storageGroup);
QueryTimeSeriesRequest request = new QueryTimeSeriesRequest(groupId, path);
- PeerId leader = RaftUtils.getTargetPeerID(groupId);
+ PeerId leader = RaftUtils.getRandomPeerID(groupId);
SingleTask task = new SingleTask(false, request);
/** Check if the plan can be executed locally. **/
- if (canHandle(storageGroup)) {
+ if (canHandleQuery(storageGroup)) {
return queryTimeSeriesLocally(path, groupId, task);
} else {
try {
@@ -91,17 +91,23 @@ public class QueryMetadataExecutor extends
ClusterQPExecutor {
}
}
+ /**
+ * Handle "show timeseries <path>" statement
+ *
+ * @param path column path
+ */
private List<List<String>> queryTimeSeriesLocally(String path, String
groupId, SingleTask task)
throws InterruptedException {
final byte[] reqContext = new byte[4];
Bits.putInt(reqContext, 0, requestId.incrementAndGet());
- DataPartitionRaftHolder dataPartitionHolder = (DataPartitionRaftHolder)
server.getDataPartitionHolder(groupId);
+ DataPartitionRaftHolder dataPartitionHolder = (DataPartitionRaftHolder)
server
+ .getDataPartitionHolder(groupId);
((RaftService) dataPartitionHolder.getService()).getNode()
.readIndex(reqContext, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
- QueryTimeSeriesResponse response = null;
+ QueryTimeSeriesResponse response;
if (status.isOk()) {
try {
response = new QueryTimeSeriesResponse(false, true,
@@ -125,6 +131,11 @@ public class QueryMetadataExecutor extends
ClusterQPExecutor {
return ((QueryTimeSeriesResponse) response).getTimeSeries();
}
+ /**
+ * Handle "show storage group" statement locally
+ *
+ * @return Set of storage group name
+ */
private Set<String> queryStorageGroupLocally() throws InterruptedException {
QueryStorageGroupRequest request = new
QueryStorageGroupRequest(ClusterConfig.METADATA_GROUP_ID);
SingleTask task = new SingleTask(false, request);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/QueryTimeSeriesRequest.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/QueryTimeSeriesRequest.java
index 7c2c12f..142362f 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/QueryTimeSeriesRequest.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/QueryTimeSeriesRequest.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.cluster.rpc.request;
import java.io.Serializable;
-import org.apache.iotdb.cluster.rpc.MetadataType;
public class QueryTimeSeriesRequest extends BasicRequest implements
Serializable {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
index 3672ca4..a384830 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
@@ -23,7 +23,6 @@ import java.util.List;
import java.util.Set;
import org.apache.iotdb.cluster.qp.executor.NonQueryExecutor;
import org.apache.iotdb.cluster.qp.executor.QueryMetadataExecutor;
-import org.apache.iotdb.cluster.rpc.MetadataType;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -107,7 +106,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
@Override
protected List<List<String>> getTimeSeriesForPath(String path)
- throws PathErrorException, InterruptedException {
+ throws PathErrorException, InterruptedException, ProcessorException {
return queryMetadataExecutor.get().processTimeSeriesQuery(path);
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index 364ef24..984bfb6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -83,22 +83,30 @@ public class RaftUtils {
*/
public static PeerId getTargetPeerID(String groupId) {
if (!groupLeaderCache.containsKey(groupId)) {
- PeerId randomPeerId;
- if (groupId.equals(CLUSTER_CONFIG.METADATA_GROUP_ID)) {
- RaftService service = (RaftService)
server.getMetadataHolder().getService();
- List<PeerId> peerIdList = service.getPeerIdList();
- randomPeerId = peerIdList.get(getRandomInt(peerIdList.size()));
- } else {
- PhysicalNode[] physicalNodes = router.getNodesByGroupId(groupId);
- PhysicalNode node = physicalNodes[getRandomInt(physicalNodes.length)];
- randomPeerId = new PeerId(node.ip, node.port);
- }
+ PeerId randomPeerId = getRandomPeerID(groupId);
groupLeaderCache.put(groupId, randomPeerId);
}
return groupLeaderCache.get(groupId);
}
/**
+ * Get random peer id
+ */
+ public static PeerId getRandomPeerID(String groupId) {
+ PeerId randomPeerId;
+ if (groupId.equals(CLUSTER_CONFIG.METADATA_GROUP_ID)) {
+ RaftService service = (RaftService)
server.getMetadataHolder().getService();
+ List<PeerId> peerIdList = service.getPeerIdList();
+ randomPeerId = peerIdList.get(getRandomInt(peerIdList.size()));
+ } else {
+ PhysicalNode[] physicalNodes = router.getNodesByGroupId(groupId);
+ PhysicalNode node = physicalNodes[getRandomInt(physicalNodes.length)];
+ randomPeerId = convertPhysicalNode(node);
+ }
+ return randomPeerId;
+ }
+
+ /**
* Get random int from [0, bound).
*/
public static int getRandomInt(int bound) {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
index 09f54ba..9326610 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.cluster.utils.hash;
+import com.alipay.sofa.jraft.util.OnlyForTest;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -29,6 +30,9 @@ import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.ErrorConfigureExecption;
+/**
+ * Cluster router, it's responsible for hash mapping and routing to specified
data groups
+ */
public class Router {
/**
@@ -126,6 +130,8 @@ public class Router {
/**
* Calculate the physical nodes corresponding to the replications where a
data point is located
+ *
+ * @param objectKey storage group
*/
public PhysicalNode[] routeGroup(String objectKey) {
if (sgRouter.containsKey(objectKey)) {
@@ -145,6 +151,9 @@ public class Router {
return this.getGroupsNodes(new PhysicalNode(ip, port));
}
+ /**
+ * Add a new node to cluster
+ */
private void addNode(PhysicalNode node, int virtualNum) {
physicalRing.put(hashFunction.hash(node.getKey()), node);
for (int i = 0; i < virtualNum; i++) {
@@ -181,14 +190,14 @@ public class Router {
nodeMapGroupIdCache.clear();
}
- // only for test
+ @OnlyForTest
public void showPhysicalRing() {
for (Entry<Integer, PhysicalNode> entry : physicalRing.entrySet()) {
System.out.println(String.format("%d-%s", entry.getKey(),
entry.getValue().getKey()));
}
}
- //only for test
+ @OnlyForTest
public void showVirtualRing() {
for (Entry<Integer, VirtualNode> entry : virtualRing.entrySet()) {
System.out.println(String.format("%d-%s", entry.getKey(),
entry.getValue().getKey()));
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 05fabfd..064446a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -232,7 +232,7 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
try {
List<List<String>> showTimeseriesList = getTimeSeriesForPath(path);
resp.setShowTimeseriesList(showTimeseriesList);
- } catch (PathErrorException | InterruptedException e) {
+ } catch (PathErrorException | InterruptedException |
ProcessorException e) {
status = getErrorStatus(
String.format("Failed to fetch timeseries %s's metadata
because: %s",
req.getColumnPath(), e));
@@ -269,7 +269,7 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
status = new TS_Status(TS_StatusCode.SUCCESS_STATUS);
break;
case "METADATA_IN_JSON":
- String metadataInJson = null;
+ String metadataInJson;
try {
metadataInJson = MManager.getInstance().getMetadataInString();
} catch (OutOfMemoryError outOfMemoryError) { // TODO OOME
@@ -348,7 +348,7 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
}
protected List<List<String>> getTimeSeriesForPath(String path)
- throws PathErrorException, InterruptedException {
+ throws PathErrorException, InterruptedException, ProcessorException {
return MManager.getInstance().getShowTimeseriesPath(path);
}