This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch cluster-
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/cluster- by this push:
new f22376b fix bug for TestUtils.serializedpartitionTable
f22376b is described below
commit f22376bd5a27fd1dcea2ab216e0490d2d440ec93
Author: xiangdong huang <[email protected]>
AuthorDate: Wed Aug 18 11:33:26 2021 +0800
fix bug for TestUtils.serializedpartitionTable
---
.../java/org/apache/iotdb/cluster/ClientMain.java | 13 ++--
.../org/apache/iotdb/cluster/ClusterIoTDB.java | 1 +
.../iotdb/cluster/client/DataClientProvider.java | 1 +
.../cluster/client/async/AsyncDataClient.java | 1 +
.../client/async/AsyncDataHeartbeatClient.java | 1 +
.../cluster/client/async/AsyncMetaClient.java | 1 +
.../client/async/AsyncMetaHeartbeatClient.java | 1 +
.../iotdb/cluster/client/sync/SyncClientPool.java | 28 ++++---
.../iotdb/cluster/client/sync/SyncDataClient.java | 73 ++++--------------
.../client/sync/SyncDataHeartbeatClient.java | 42 +++++------
.../iotdb/cluster/client/sync/SyncMetaClient.java | 60 +++++----------
.../client/sync/SyncMetaHeartbeatClient.java | 43 +++++------
...yncDataClient.java => TSDataServiceClient.java} | 85 +++++++++------------
...yncDataClient.java => TSMetaServiceClient.java} | 87 +++++++++-------------
.../query/last/ClusterLastQueryExecutor.java | 2 +-
.../cluster/server/member/DataGroupMember.java | 1 +
.../cluster/server/member/MetaGroupMember.java | 1 +
.../cluster/client/async/AsyncClientPoolTest.java | 1 +
.../client/async/AsyncDataHeartbeatClientTest.java | 3 +-
.../cluster/client/async/AsyncMetaClientTest.java | 1 +
.../client/async/AsyncMetaHeartbeatClientTest.java | 1 +
.../cluster/client/sync/SyncDataClientTest.java | 14 +---
.../client/sync/SyncDataHeartbeatClientTest.java | 4 +-
.../cluster/client/sync/SyncMetaClientTest.java | 7 +-
.../client/sync/SyncMetaHeartbeatClientTest.java | 4 +-
.../org/apache/iotdb/cluster/common/TestUtils.java | 11 ++-
.../iotdb/cluster/integration/SingleNodeTest.java | 1 +
.../apache/iotdb/cluster/log/LogParserTest.java | 7 +-
.../cluster/log/applier/MetaLogApplierTest.java | 5 +-
.../cluster/log/catchup/LogCatchUpTaskTest.java | 2 +-
.../log/catchup/SnapshotCatchUpTaskTest.java | 4 +-
.../cluster/log/logtypes/SerializeLogTest.java | 5 +-
.../cluster/log/snapshot/DataSnapshotTest.java | 3 +-
.../org/apache/iotdb/jdbc/IoTDBConnection.java | 1 +
pom.xml | 2 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 3 +-
.../iotdb/db/sync/sender/transfer/SyncClient.java | 3 +-
.../apache/iotdb/session/SessionConnection.java | 1 +
38 files changed, 222 insertions(+), 302 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
b/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
index f56f699..1c28cd1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
@@ -19,12 +19,6 @@
package org.apache.iotdb.cluster;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.DefaultParser;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.jdbc.Config;
@@ -49,6 +43,13 @@ import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index eebc015..e9fd8fe 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -59,6 +59,7 @@ import org.apache.iotdb.db.service.JMXService;
import org.apache.iotdb.db.service.RegisterManager;
import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
import org.apache.iotdb.db.utils.TestOnly;
+
import org.apache.thrift.TException;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
index 769150b..4f189f1 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
import org.apache.iotdb.db.utils.TestOnly;
+
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocolFactory;
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
index cad58ba..c4ce231 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.rpc.thrift.TSDataService.AsyncClient;
import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
+
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.async.TAsyncMethodCall;
import org.apache.thrift.protocol.TProtocolFactory;
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClient.java
b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClient.java
index 6bc280d..0fbf41e 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClient.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClient.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
+
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TProtocolFactory;
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java
b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java
index 7082ab6..f6fb94d 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncClient;
import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
+
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.async.TAsyncMethodCall;
import org.apache.thrift.protocol.TProtocolFactory;
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClient.java
b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClient.java
index dcdb44e..cf0d3a2 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClient.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClient.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
+
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TProtocolFactory;
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
index cdfa5df..e0c55e8 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
@@ -22,11 +22,10 @@ package org.apache.iotdb.cluster.client.sync;
import org.apache.iotdb.cluster.ClusterIoTDB;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
import org.apache.iotdb.cluster.utils.ClusterNode;
import org.apache.iotdb.db.utils.TestOnly;
-
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +40,8 @@ public class SyncClientPool {
private static final Logger logger =
LoggerFactory.getLogger(SyncClientPool.class);
private long waitClientTimeoutMS;
private int maxConnectionForEachNode;
- private Map<ClusterNode, Deque<Client>> clientCaches = new
ConcurrentHashMap<>();
+ // TODO should we really need a Node here? or just using its ID?
+ private Map<ClusterNode, Deque<RaftService.Client>> clientCaches = new
ConcurrentHashMap<>();
private Map<ClusterNode, Integer> nodeClientNumMap = new
ConcurrentHashMap<>();
private SyncClientFactory syncClientFactory;
@@ -58,7 +58,7 @@ public class SyncClientPool {
* @param node the node want to connect
* @return if the node can connect, return the client, otherwise null
*/
- public Client getClient(Node node) {
+ public RaftService.Client getClient(Node node) {
return getClient(node, true);
}
@@ -73,21 +73,23 @@ public class SyncClientPool {
* always try to connect so the node can be reactivated ASAP
* @return if the node can connect, return the client, otherwise null
*/
- public Client getClient(Node node, boolean activatedOnly) {
- ClusterNode clusterNode = new ClusterNode(node);
+ public RaftService.Client getClient(Node node, boolean activatedOnly) {
if (activatedOnly && !NodeStatusManager.getINSTANCE().isActivated(node)) {
return null;
}
+ ClusterNode clusterNode = new ClusterNode(node);
// As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
- Deque<Client> clientStack = clientCaches.computeIfAbsent(clusterNode, n ->
new ArrayDeque<>());
+ Deque<RaftService.Client> clientStack =
+ clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+
synchronized (clientStack) {
if (clientStack.isEmpty()) {
int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0);
if (nodeClientNum >= maxConnectionForEachNode) {
return waitForClient(clientStack, clusterNode);
} else {
- Client client = null;
+ RaftService.Client client = null;
try {
client = syncClientFactory.getSyncClient(clusterNode, this);
} catch (TTransportException e) {
@@ -115,7 +117,8 @@ public class SyncClientPool {
}
@SuppressWarnings("squid:S2273") // synchronized outside
- private Client waitForClient(Deque<Client> clientStack, ClusterNode
clusterNode) {
+ private RaftService.Client waitForClient(
+ Deque<RaftService.Client> clientStack, ClusterNode clusterNode) {
// wait for an available client
long waitStart = System.currentTimeMillis();
while (clientStack.isEmpty()) {
@@ -125,7 +128,7 @@ public class SyncClientPool {
&& System.currentTimeMillis() - waitStart >= waitClientTimeoutMS) {
logger.warn(
"Cannot get an available client after {}ms, create a new one",
waitClientTimeoutMS);
- Client client = syncClientFactory.getSyncClient(clusterNode, this);
+ RaftService.Client client =
syncClientFactory.getSyncClient(clusterNode, this);
nodeClientNumMap.computeIfPresent(clusterNode, (n, oldValue) ->
oldValue + 1);
return client;
}
@@ -155,10 +158,11 @@ public class SyncClientPool {
* @param node connection node
* @param client push client to pool
*/
- public void putClient(Node node, Client client) {
+ public void putClient(Node node, RaftService.Client client) {
ClusterNode clusterNode = new ClusterNode(node);
// As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
- Deque<Client> clientStack = clientCaches.computeIfAbsent(clusterNode, n ->
new ArrayDeque<>());
+ Deque<RaftService.Client> clientStack =
+ clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
synchronized (clientStack) {
if (client.getInputProtocol() != null &&
client.getInputProtocol().getTransport().isOpen()) {
clientStack.push(client);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
index 764afdf..2854670 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
@@ -21,73 +21,41 @@ package org.apache.iotdb.cluster.client.sync;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.TSDataService.Client;
import org.apache.iotdb.db.utils.TestOnly;
-import org.apache.iotdb.rpc.RpcTransportFactory;
-import org.apache.iotdb.rpc.TimeoutChangeableTransport;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TTransportException;
-import java.io.Closeable;
-import java.net.SocketException;
-
/**
* Notice: Because a client will be returned to a pool immediately after a
successful request, you
* should not cache it anywhere else or there may be conflicts.
*/
// the two classes does not share a common parent and Java does not allow
multiple extension
@SuppressWarnings("common-java:DuplicatedBlocks")
-public class SyncDataClient extends Client implements Closeable {
-
- Node node;
- SyncClientPool pool;
+public class SyncDataClient extends TSDataServiceClient {
- SyncDataClient(TProtocol prot) {
+ /** @param prot this constructor just create a new instance, but do not open
the connection */
+ @TestOnly
+ public SyncDataClient(TProtocol prot) {
super(prot);
}
- private SyncDataClient(TProtocolFactory protocolFactory, Node node,
SyncClientPool pool)
+ SyncDataClient(TProtocolFactory protocolFactory, Node target, SyncClientPool
pool)
throws TTransportException {
-
- // the difference of the two clients lies in the port
super(
- protocolFactory.getProtocol(
- RpcTransportFactory.INSTANCE.getTransport(
- node.getInternalIp(),
- node.getDataPort(),
- ClusterConstant.getConnectionTimeoutInMS())));
-
- this.node = node;
- this.pool = pool;
- getInputProtocol().getTransport().open();
- }
-
- public void setTimeout(int timeout) {
- // the same transport is used in both input and output
- ((TimeoutChangeableTransport)
(getInputProtocol().getTransport())).setTimeout(timeout);
- }
-
- @TestOnly
- public int getTimeout() throws SocketException {
- return ((TimeoutChangeableTransport)
getInputProtocol().getTransport()).getTimeOut();
+ protocolFactory,
+ target.getInternalIp(),
+ target.getDataPort(),
+ ClusterConstant.getConnectionTimeoutInMS(),
+ target,
+ pool);
}
- public void putBack() {
- if (pool != null) {
- pool.putClient(node, this);
- } else {
- TProtocol inputProtocol = getInputProtocol();
- if (inputProtocol != null) {
- inputProtocol.getTransport().close();
- }
- }
- }
-
- /** put the client to pool, instead of close client. */
@Override
- public void close() {
- putBack();
+ public String toString() {
+ return String.format(
+ "SyncDataClient (ip = %s, port = %d, id = %d)",
+ target.getInternalIp(), target.getDataPort(),
target.getNodeIdentifier());
}
public static class Factory implements SyncClientFactory {
@@ -106,17 +74,8 @@ public class SyncDataClient extends Client implements
Closeable {
@Override
public String nodeInfo(Node node) {
return String.format(
- "DataNode (listenIp = %s, port = %d, id = %d)",
+ "DataNode (ip = %s, port = %d, id = %d)",
node.getInternalIp(), node.getDataPort(), node.getNodeIdentifier());
}
}
-
- @Override
- public String toString() {
- return "DataClient{" + "node=" + node + '}';
- }
-
- public Node getNode() {
- return node;
- }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
index 4f6c259..cea6eff 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.cluster.client.sync;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.utils.ClusterUtils;
-import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TTransportException;
@@ -30,20 +29,28 @@ import org.apache.thrift.transport.TTransportException;
* Notice: Because a client will be returned to a pool immediately after a
successful request, you
* should not cache it anywhere else or there may be conflicts.
*/
-public class SyncDataHeartbeatClient extends SyncDataClient {
+public class SyncDataHeartbeatClient extends TSDataServiceClient {
- private SyncDataHeartbeatClient(TProtocolFactory protocolFactory, Node node,
SyncClientPool pool)
+ private SyncDataHeartbeatClient(
+ TProtocolFactory protocolFactory, Node target, SyncClientPool pool)
throws TTransportException {
// the difference of the two clients lies in the port
super(
- protocolFactory.getProtocol(
- RpcTransportFactory.INSTANCE.getTransport(
- node.getInternalIp(),
- node.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
- ClusterConstant.getConnectionTimeoutInMS())));
- this.node = node;
- this.pool = pool;
- getInputProtocol().getTransport().open();
+ protocolFactory,
+ target.getInternalIp(),
+ target.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
+ ClusterConstant.getConnectionTimeoutInMS(),
+ target,
+ pool);
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "SyncDataHBClient (ip = %s, port = %d, id = %d)",
+ target.getInternalIp(),
+ target.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
+ target.getNodeIdentifier());
}
public static class Factory implements SyncClientFactory {
@@ -63,21 +70,10 @@ public class SyncDataHeartbeatClient extends SyncDataClient
{
@Override
public String nodeInfo(Node node) {
return String.format(
- "DataNode (listenIp = %s, HB port = %d, id = %d)",
+ "DataHBNode (ip = %s, port = %d, id = %d)",
node.getInternalIp(),
node.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
node.getNodeIdentifier());
}
}
-
- @Override
- public String toString() {
- return "SyncHeartbeatDataClient{"
- + "node="
- + super.getNode()
- + ","
- + "dataHeartbeatPort="
- + (super.getNode().getDataPort() +
ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET)
- + '}';
- }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
index b5b3615..7060083 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
@@ -21,55 +21,42 @@ package org.apache.iotdb.cluster.client.sync;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Client;
-import org.apache.iotdb.cluster.utils.ClusterUtils;
-import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.db.utils.TestOnly;
+
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TTransportException;
-import java.io.Closeable;
-
/**
* Notice: Because a client will be returned to a pool immediately after a
successful request, you
* should not cache it anywhere else or there may be conflicts.
*/
// the two classes does not share a common parent and Java does not allow
multiple extension
@SuppressWarnings("common-java:DuplicatedBlocks")
-public class SyncMetaClient extends Client implements Closeable {
-
- Node node;
- SyncClientPool pool;
+public class SyncMetaClient extends TSMetaServiceClient {
+ /** @param prot this constructor just create a new instance, but do not open
the connection */
+ @TestOnly
SyncMetaClient(TProtocol prot) {
super(prot);
}
- private SyncMetaClient(TProtocolFactory protocolFactory, Node node,
SyncClientPool pool)
+ private SyncMetaClient(TProtocolFactory protocolFactory, Node target,
SyncClientPool pool)
throws TTransportException {
super(
- protocolFactory.getProtocol(
- RpcTransportFactory.INSTANCE.getTransport(
- node.getInternalIp(),
- node.getMetaPort(),
- ClusterConstant.getConnectionTimeoutInMS())));
- this.node = node;
- this.pool = pool;
- getInputProtocol().getTransport().open();
+ protocolFactory,
+ target.getInternalIp(),
+ target.getMetaPort(),
+ ClusterConstant.getConnectionTimeoutInMS(),
+ target,
+ pool);
}
- public void putBack() {
- if (pool != null) {
- pool.putClient(node, this);
- } else {
- getInputProtocol().getTransport().close();
- }
- }
-
- /** put the client to pool, instead of close client. */
@Override
- public void close() {
- putBack();
+ public String toString() {
+ return String.format(
+ "SyncMetaClient (ip = %s, port = %d, id = %d)",
+ target.getInternalIp(), target.getMetaPort(),
target.getNodeIdentifier());
}
public static class Factory implements SyncClientFactory {
@@ -88,19 +75,8 @@ public class SyncMetaClient extends Client implements
Closeable {
@Override
public String nodeInfo(Node node) {
return String.format(
- "MetaNode (listenIp = %s, HB port = %d, id = %d)",
- node.getInternalIp(),
- node.getMetaPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
- node.getNodeIdentifier());
+ "MetaNode (ip = %s, port = %d, id = %d)",
+ node.getInternalIp(), node.getMetaPort(), node.getNodeIdentifier());
}
}
-
- public Node getNode() {
- return node;
- }
-
- @Override
- public String toString() {
- return "SyncMetaClient{" + " node=" + node + ", pool=" + pool + "}";
- }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
index 929db23..cbd4363 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.cluster.client.sync;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.utils.ClusterUtils;
-import org.apache.iotdb.rpc.RpcTransportFactory;
+
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TTransportException;
@@ -30,20 +30,28 @@ import org.apache.thrift.transport.TTransportException;
* Notice: Because a client will be returned to a pool immediately after a
successful request, you
* should not cache it anywhere else or there may be conflicts.
*/
-public class SyncMetaHeartbeatClient extends SyncMetaClient {
+public class SyncMetaHeartbeatClient extends TSMetaServiceClient {
- private SyncMetaHeartbeatClient(TProtocolFactory protocolFactory, Node node,
SyncClientPool pool)
+ private SyncMetaHeartbeatClient(
+ TProtocolFactory protocolFactory, Node target, SyncClientPool pool)
throws TTransportException {
// the difference of the two clients lies in the port
super(
- protocolFactory.getProtocol(
- RpcTransportFactory.INSTANCE.getTransport(
- node.getInternalIp(),
- node.getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET,
- ClusterConstant.getConnectionTimeoutInMS())));
- this.node = node;
- this.pool = pool;
- getInputProtocol().getTransport().open();
+ protocolFactory,
+ target.getInternalIp(),
+ target.getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET,
+ ClusterConstant.getConnectionTimeoutInMS(),
+ target,
+ pool);
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "SyncMetaHBClient (ip = %s, port = %d, id = %d)",
+ target.getInternalIp(),
+ target.getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET,
+ target.getNodeIdentifier());
}
public static class Factory implements SyncClientFactory {
@@ -63,19 +71,8 @@ public class SyncMetaHeartbeatClient extends SyncMetaClient {
@Override
public String nodeInfo(Node node) {
return String.format(
- "MetaNode (listenIp = %s, port = %d, id = %d)",
+ "MetaHBNode (ip = %s, port = %d, id = %d)",
node.getInternalIp(), node.getMetaPort(), node.getNodeIdentifier());
}
}
-
- @Override
- public String toString() {
- return "SyncMetaHeartbeatClient{"
- + "node="
- + super.getNode()
- + ","
- + "metaHeartbeatPort="
- + (super.getNode().getMetaPort() +
ClusterUtils.META_HEARTBEAT_PORT_OFFSET)
- + '}';
- }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/TSDataServiceClient.java
similarity index 57%
copy from
cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
copy to
cluster/src/main/java/org/apache/iotdb/cluster/client/sync/TSDataServiceClient.java
index 764afdf..3102ebd 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/TSDataServiceClient.java
@@ -16,15 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.iotdb.cluster.client.sync;
-import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.TSDataService.Client;
+import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.TimeoutChangeableTransport;
+
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TTransportException;
@@ -32,33 +31,42 @@ import org.apache.thrift.transport.TTransportException;
import java.io.Closeable;
import java.net.SocketException;
-/**
- * Notice: Because a client will be returned to a pool immediately after a
successful request, you
- * should not cache it anywhere else or there may be conflicts.
- */
-// the two classes does not share a common parent and Java does not allow
multiple extension
-@SuppressWarnings("common-java:DuplicatedBlocks")
-public class SyncDataClient extends Client implements Closeable {
+public abstract class TSDataServiceClient extends TSDataService.Client
implements Closeable {
- Node node;
+ Node target;
SyncClientPool pool;
- SyncDataClient(TProtocol prot) {
+ /** @param prot this constructor just create a new instance, but do not open
the connection */
+ @TestOnly
+ TSDataServiceClient(TProtocol prot) {
super(prot);
}
- private SyncDataClient(TProtocolFactory protocolFactory, Node node,
SyncClientPool pool)
+ /**
+ * cerate a new client and open the connection
+ *
+ * @param protocolFactory
+ * @param ip
+ * @param port
+ * @param timeoutInMS
+ * @param target
+ * @param pool
+ * @throws TTransportException
+ */
+ public TSDataServiceClient(
+ TProtocolFactory protocolFactory,
+ String ip,
+ int port,
+ int timeoutInMS,
+ Node target,
+ SyncClientPool pool)
throws TTransportException {
// the difference of the two clients lies in the port
super(
protocolFactory.getProtocol(
- RpcTransportFactory.INSTANCE.getTransport(
- node.getInternalIp(),
- node.getDataPort(),
- ClusterConstant.getConnectionTimeoutInMS())));
-
- this.node = node;
+ RpcTransportFactory.INSTANCE.getTransport(ip, port, timeoutInMS)));
+ this.target = target;
this.pool = pool;
getInputProtocol().getTransport().open();
}
@@ -73,9 +81,16 @@ public class SyncDataClient extends Client implements
Closeable {
return ((TimeoutChangeableTransport)
getInputProtocol().getTransport()).getTimeOut();
}
+ /**
+ * if the client does not open the connection, remove it.
+ *
+ * <p>if the client's connection is closed, create a new one.
+ *
+ * <p>if the client's connection is fine, put it back to the pool
+ */
public void putBack() {
if (pool != null) {
- pool.putClient(node, this);
+ pool.putClient(target, this);
} else {
TProtocol inputProtocol = getInputProtocol();
if (inputProtocol != null) {
@@ -90,33 +105,7 @@ public class SyncDataClient extends Client implements
Closeable {
putBack();
}
- public static class Factory implements SyncClientFactory {
-
- private TProtocolFactory protocolFactory;
-
- public Factory(TProtocolFactory protocolFactory) {
- this.protocolFactory = protocolFactory;
- }
-
- @Override
- public SyncDataClient getSyncClient(Node node, SyncClientPool pool) throws
TTransportException {
- return new SyncDataClient(protocolFactory, node, pool);
- }
-
- @Override
- public String nodeInfo(Node node) {
- return String.format(
- "DataNode (listenIp = %s, port = %d, id = %d)",
- node.getInternalIp(), node.getDataPort(), node.getNodeIdentifier());
- }
- }
-
- @Override
- public String toString() {
- return "DataClient{" + "node=" + node + '}';
- }
-
- public Node getNode() {
- return node;
+ public Node getTarget() {
+ return target;
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/TSMetaServiceClient.java
similarity index 57%
copy from
cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
copy to
cluster/src/main/java/org/apache/iotdb/cluster/client/sync/TSMetaServiceClient.java
index 764afdf..6a3b0dd 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/TSMetaServiceClient.java
@@ -16,15 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.iotdb.cluster.client.sync;
-import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.TSDataService.Client;
+import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.TimeoutChangeableTransport;
+
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TTransportException;
@@ -32,33 +31,40 @@ import org.apache.thrift.transport.TTransportException;
import java.io.Closeable;
import java.net.SocketException;
-/**
- * Notice: Because a client will be returned to a pool immediately after a
successful request, you
- * should not cache it anywhere else or there may be conflicts.
- */
-// the two classes does not share a common parent and Java does not allow
multiple extension
-@SuppressWarnings("common-java:DuplicatedBlocks")
-public class SyncDataClient extends Client implements Closeable {
-
- Node node;
+public class TSMetaServiceClient extends TSMetaService.Client implements
Closeable {
+ Node target;
SyncClientPool pool;
- SyncDataClient(TProtocol prot) {
+ /** @param prot this constructor just create a new instance, but do not open
the connection */
+ @TestOnly
+ public TSMetaServiceClient(TProtocol prot) {
super(prot);
}
-
- private SyncDataClient(TProtocolFactory protocolFactory, Node node,
SyncClientPool pool)
+ /**
+ * cerate a new client and open the connection
+ *
+ * @param protocolFactory
+ * @param ip
+ * @param port
+ * @param timeoutInMS
+ * @param target
+ * @param pool
+ * @throws TTransportException
+ */
+ public TSMetaServiceClient(
+ TProtocolFactory protocolFactory,
+ String ip,
+ int port,
+ int timeoutInMS,
+ Node target,
+ SyncClientPool pool)
throws TTransportException {
// the difference of the two clients lies in the port
super(
protocolFactory.getProtocol(
- RpcTransportFactory.INSTANCE.getTransport(
- node.getInternalIp(),
- node.getDataPort(),
- ClusterConstant.getConnectionTimeoutInMS())));
-
- this.node = node;
+ RpcTransportFactory.INSTANCE.getTransport(ip, port, timeoutInMS)));
+ this.target = target;
this.pool = pool;
getInputProtocol().getTransport().open();
}
@@ -73,9 +79,16 @@ public class SyncDataClient extends Client implements
Closeable {
return ((TimeoutChangeableTransport)
getInputProtocol().getTransport()).getTimeOut();
}
+ /**
+ * if the client does not open the connection, remove it.
+ *
+ * <p>if the client's connection is closed, create a new one.
+ *
+ * <p>if the client's connection is fine, put it back to the pool
+ */
public void putBack() {
if (pool != null) {
- pool.putClient(node, this);
+ pool.putClient(target, this);
} else {
TProtocol inputProtocol = getInputProtocol();
if (inputProtocol != null) {
@@ -90,33 +103,7 @@ public class SyncDataClient extends Client implements
Closeable {
putBack();
}
- public static class Factory implements SyncClientFactory {
-
- private TProtocolFactory protocolFactory;
-
- public Factory(TProtocolFactory protocolFactory) {
- this.protocolFactory = protocolFactory;
- }
-
- @Override
- public SyncDataClient getSyncClient(Node node, SyncClientPool pool) throws
TTransportException {
- return new SyncDataClient(protocolFactory, node, pool);
- }
-
- @Override
- public String nodeInfo(Node node) {
- return String.format(
- "DataNode (listenIp = %s, port = %d, id = %d)",
- node.getInternalIp(), node.getDataPort(), node.getNodeIdentifier());
- }
- }
-
- @Override
- public String toString() {
- return "DataClient{" + "node=" + node + '}';
- }
-
- public Node getNode() {
- return node;
+ public Node getTarget() {
+ return target;
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
index b4e9d5d..d22aeda 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
@@ -275,7 +275,7 @@ public class ClusterLastQueryExecutor extends
LastQueryExecutor {
context.getQueryId(),
queryPlan.getDeviceToMeasurements(),
group.getHeader(),
- syncDataClient.getNode()));
+ syncDataClient.getTarget()));
} catch (TException e) {
// the connection may be broken, close it to avoid it being reused
syncDataClient.getInputProtocol().getTransport().close();
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index d838944..aaf14ec 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -95,6 +95,7 @@ import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.utils.Pair;
+
import org.apache.thrift.protocol.TProtocolFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 59012a1..04010a1 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -95,6 +95,7 @@ import org.apache.iotdb.db.utils.TimeValuePairUtils.Intervals;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TTransportException;
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncClientPoolTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncClientPoolTest.java
index 99fe589..a08d9fe 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncClientPoolTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncClientPoolTest.java
@@ -12,6 +12,7 @@ import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
+
import org.apache.thrift.protocol.TBinaryProtocol;
import org.junit.After;
import org.junit.Before;
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClientTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClientTest.java
index 76ab0f7..c5303c8 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClientTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClientTest.java
@@ -19,11 +19,12 @@
package org.apache.iotdb.cluster.client.async;
-import junit.framework.TestCase;
import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
+
+import junit.framework.TestCase;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.junit.After;
import org.junit.Before;
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java
index c1b73fc..53cc1a0 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java
@@ -10,6 +10,7 @@ import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.Node;
+
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.async.TAsyncClientManager;
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClientTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClientTest.java
index 41ed581..4cee53a 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClientTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClientTest.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
+
import org.apache.thrift.protocol.TBinaryProtocol;
import org.junit.After;
import org.junit.Assert;
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataClientTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataClientTest.java
index 5f0334e..feeb9ba 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataClientTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataClientTest.java
@@ -44,7 +44,7 @@ public class SyncDataClientTest {
SyncDataClient client;
client = (SyncDataClient) syncClientPool.getClient(node);
- assertEquals(node, client.getNode());
+ assertEquals(node, client.getTarget());
client.setTimeout(1000);
assertEquals(1000, client.getTimeout());
@@ -54,10 +54,7 @@ public class SyncDataClientTest {
assertEquals(client, newClient);
assertTrue(client.getInputProtocol().getTransport().isOpen());
- assertEquals(
- "DataClient{node=ClusterNode{ internalIp='localhost', metaPort=0,
nodeIdentifier=0,"
- + " dataPort=40010, clientPort=0, clientIp='localhost'}}",
- client.toString());
+ assertEquals("SyncDataClient (ip = localhost, port = 40010, id = 0)",
client.toString());
client =
new SyncDataClient(
@@ -95,7 +92,7 @@ public class SyncDataClientTest {
new SyncClientPool(new Factory(new TBinaryProtocol.Factory()));
SyncDataClient clientOut;
try (SyncDataClient clientIn = (SyncDataClient)
syncClientPool.getClient(node)) {
- assertEquals(node, clientIn.getNode());
+ assertEquals(node, clientIn.getTarget());
clientIn.setTimeout(1000);
clientOut = clientIn;
assertEquals(1000, clientIn.getTimeout());
@@ -104,10 +101,7 @@ public class SyncDataClientTest {
try (SyncDataClient newClient = (SyncDataClient)
syncClientPool.getClient(node)) {
assertEquals(clientOut, newClient);
- assertEquals(
- "DataClient{node=ClusterNode{ internalIp='localhost', metaPort=0,
nodeIdentifier=0,"
- + " dataPort=40010, clientPort=0, clientIp='localhost'}}",
- newClient.toString());
+ assertEquals("SyncDataClient (ip = localhost, port = 40010, id = 0)",
newClient.toString());
}
try (SyncDataClient clientIn =
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClientTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClientTest.java
index a03e6b5..d81a424 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClientTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClientTest.java
@@ -53,9 +53,7 @@ public class SyncDataHeartbeatClientTest {
new SyncDataHeartbeatClient.Factory(new Factory());
SyncDataHeartbeatClient syncClient = factoryAsync.getSyncClient(node,
null);
Assert.assertEquals(
- "SyncHeartbeatDataClient{node=Node(internalIp:localhost, metaPort:0,
nodeIdentifier:0,"
- + " dataPort:40010, clientPort:0,
clientIp:localhost),dataHeartbeatPort=40011}",
- syncClient.toString());
+ "SyncDataHBClient (ip = localhost, port = 40011, id = 0)",
syncClient.toString());
} finally {
serverSocket.close();
listenThread.interrupt();
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaClientTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaClientTest.java
index d0d994f..2bc64dd 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaClientTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaClientTest.java
@@ -8,6 +8,7 @@ import
org.apache.iotdb.cluster.client.sync.SyncMetaClient.Factory;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
import org.apache.iotdb.rpc.TSocketWrapper;
+
import org.apache.thrift.protocol.TBinaryProtocol;
import org.junit.Test;
@@ -44,7 +45,7 @@ public class SyncMetaClientTest {
SyncMetaClient client;
client = (SyncMetaClient) syncClientPool.getClient(node);
- assertEquals(node, client.getNode());
+ assertEquals(node, client.getTarget());
client.putBack();
Client newClient = syncClientPool.getClient(node);
@@ -87,12 +88,12 @@ public class SyncMetaClientTest {
new SyncClientPool(new Factory(new TBinaryProtocol.Factory()));
SyncMetaClient clientOut;
try (SyncMetaClient clientIn = (SyncMetaClient)
syncClientPool.getClient(node); ) {
- assertEquals(node, clientIn.getNode());
+ assertEquals(node, clientIn.getTarget());
clientOut = clientIn;
}
try (SyncMetaClient newClientIn = (SyncMetaClient)
syncClientPool.getClient(node)) {
- assertEquals(node, newClientIn.getNode());
+ assertEquals(node, newClientIn.getTarget());
assertEquals(clientOut, newClientIn);
}
assertTrue(clientOut.getInputProtocol().getTransport().isOpen());
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClientTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClientTest.java
index 146d182..45fb718 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClientTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClientTest.java
@@ -53,9 +53,7 @@ public class SyncMetaHeartbeatClientTest {
new SyncMetaHeartbeatClient.Factory(new TBinaryProtocol.Factory());
SyncMetaHeartbeatClient syncClient = factoryAsync.getSyncClient(node,
null);
Assert.assertEquals(
- "SyncMetaHeartbeatClient{node=Node(internalIp:localhost,
metaPort:9003,"
- + " nodeIdentifier:0, dataPort:0, clientPort:0,
clientIp:localhost),metaHeartbeatPort=9004}",
- syncClient.toString());
+ "SyncMetaHBClient (ip = localhost, port = 9004, id = 0)",
syncClient.toString());
} finally {
serverSocket.close();
listenThread.interrupt();
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
index 986b019..7951dc2 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
@@ -70,7 +70,14 @@ public class TestUtils {
public static long TEST_TIME_OUT_MS = 200;
- public static ByteBuffer seralizePartitionTable =
getPartitionTable(3).serialize();
+ private static ByteBuffer seralizePartitionTable =
getPartitionTable(3).serialize();
+
+ // we need to reset the bytebuffer's position because it may be changed.
e.g., in
+ // MetaLogApplierTest.testApplyAddNode()
+ public static ByteBuffer getSeralizePartitionTable() {
+ seralizePartitionTable.rewind();
+ return seralizePartitionTable;
+ }
private TestUtils() {
// util class
@@ -96,7 +103,7 @@ public class TestUtils {
for (int i = 0; i < logNum; i++) {
AddNodeLog log = new AddNodeLog();
log.setNewNode(getNode(i));
- log.setPartitionTable(seralizePartitionTable);
+ log.setPartitionTable(getSeralizePartitionTable());
log.setCurrLogIndex(i);
log.setCurrLogTerm(i);
logList.add(log);
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/integration/SingleNodeTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/integration/SingleNodeTest.java
index f806a9d..b4267b7 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/integration/SingleNodeTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/integration/SingleNodeTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.jdbc.Config;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
+
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
index 89ef5cb..5abb81a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.LogPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
-
import org.junit.Test;
import java.io.IOException;
@@ -49,7 +48,7 @@ public class LogParserTest {
public void testAddNodeLog() throws UnknownLogTypeException {
AddNodeLog log = new AddNodeLog();
log.setNewNode(TestUtils.getNode(5));
- log.setPartitionTable(TestUtils.seralizePartitionTable);
+ log.setPartitionTable(TestUtils.getSeralizePartitionTable());
log.setCurrLogIndex(8);
log.setCurrLogTerm(8);
@@ -86,7 +85,7 @@ public class LogParserTest {
@Test
public void testRemoveNodeLog() throws UnknownLogTypeException {
RemoveNodeLog log = new RemoveNodeLog();
- log.setPartitionTable(TestUtils.seralizePartitionTable);
+ log.setPartitionTable(TestUtils.getSeralizePartitionTable());
log.setRemovedNode(TestUtils.getNode(0));
log.setCurrLogIndex(8);
log.setCurrLogTerm(8);
@@ -108,7 +107,7 @@ public class LogParserTest {
@Test
public void testLogPlan() {
- AddNodeLog log = new AddNodeLog(TestUtils.seralizePartitionTable,
TestUtils.getNode(0));
+ AddNodeLog log = new AddNodeLog(TestUtils.getSeralizePartitionTable(),
TestUtils.getNode(0));
log.setMetaLogIndex(1);
try {
LogPlan logPlan = new LogPlan(log.serialize());
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
index d6d6d02..93b1469 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
@@ -39,7 +39,6 @@ import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-
import org.junit.After;
import org.junit.Test;
@@ -88,7 +87,7 @@ public class MetaLogApplierTest extends IoTDBTest {
Node node = new Node("localhost", 1111, 0, 2222, Constants.RPC_PORT,
"localhost");
AddNodeLog log = new AddNodeLog();
log.setNewNode(node);
- log.setPartitionTable(TestUtils.seralizePartitionTable);
+ log.setPartitionTable(TestUtils.getSeralizePartitionTable());
applier.apply(log);
assertTrue(nodes.contains(node));
@@ -100,7 +99,7 @@ public class MetaLogApplierTest extends IoTDBTest {
Node node = testMetaGroupMember.getThisNode();
RemoveNodeLog log = new RemoveNodeLog();
- log.setPartitionTable(TestUtils.seralizePartitionTable);
+ log.setPartitionTable(TestUtils.getSeralizePartitionTable());
log.setRemovedNode(node);
applier.apply(log);
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java
index 73b91c1..0252720 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java
@@ -243,7 +243,7 @@ public class LogCatchUpTaskTest {
+ "nodeIdentifier:6, dataPort:40010, clientPort:6667,
clientIp:0.0.0.0), Node(internalIp:192.168.0.7, metaPort:9003, "
+ "nodeIdentifier:7, dataPort:40010, clientPort:6667,
clientIp:0.0.0.0), Node(internalIp:192.168.0.8, metaPort:9003, "
+ "nodeIdentifier:8, dataPort:40010, clientPort:6667,
clientIp:0.0.0.0), Node(internalIp:192.168.0.9, metaPort:9003, "
- + "nodeIdentifier:9, dataPort:40010, clientPort:6667,
clientIp:0.0.0.0)]",
+ + "nodeIdentifier:9, dataPort:40010, clientPort:6667,
clientIp:0.0.0.0)], id = 0",
e.getMessage());
}
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java
index 764d3b8..e9b3dc4 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java
@@ -244,7 +244,7 @@ public class SnapshotCatchUpTaskTest {
+ "nodeIdentifier:6, dataPort:40010, clientPort:6667,
clientIp:0.0.0.0), Node(internalIp:192.168.0.7, metaPort:9003, "
+ "nodeIdentifier:7, dataPort:40010, clientPort:6667,
clientIp:0.0.0.0), Node(internalIp:192.168.0.8, metaPort:9003, "
+ "nodeIdentifier:8, dataPort:40010, clientPort:6667,
clientIp:0.0.0.0), Node(internalIp:192.168.0.9, metaPort:9003, "
- + "nodeIdentifier:9, dataPort:40010, clientPort:6667,
clientIp:0.0.0.0)]",
+ + "nodeIdentifier:9, dataPort:40010, clientPort:6667,
clientIp:0.0.0.0)], id = 0",
e.getMessage());
}
@@ -277,7 +277,7 @@ public class SnapshotCatchUpTaskTest {
+ "nodeIdentifier:6, dataPort:40010, clientPort:6667,
clientIp:0.0.0.0), Node(internalIp:192.168.0.7, metaPort:9003, "
+ "nodeIdentifier:7, dataPort:40010, clientPort:6667,
clientIp:0.0.0.0), Node(internalIp:192.168.0.8, metaPort:9003, "
+ "nodeIdentifier:8, dataPort:40010, clientPort:6667,
clientIp:0.0.0.0), Node(internalIp:192.168.0.9, metaPort:9003, "
- + "nodeIdentifier:9, dataPort:40010, clientPort:6667,
clientIp:0.0.0.0)]",
+ + "nodeIdentifier:9, dataPort:40010, clientPort:6667,
clientIp:0.0.0.0)], id = 0",
e.getMessage());
}
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
index 8ffa221..52da961 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
@@ -34,7 +34,6 @@ import
org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-
import org.junit.Test;
import java.nio.ByteBuffer;
@@ -101,7 +100,7 @@ public class SerializeLogTest {
@Test
public void testAddNodeLog() throws UnknownLogTypeException {
AddNodeLog log = new AddNodeLog();
- log.setPartitionTable(TestUtils.seralizePartitionTable);
+ log.setPartitionTable(TestUtils.getSeralizePartitionTable());
log.setCurrLogIndex(2);
log.setCurrLogTerm(2);
log.setNewNode(
@@ -126,7 +125,7 @@ public class SerializeLogTest {
@Test
public void testRemoveNodeLog() throws UnknownLogTypeException {
RemoveNodeLog log = new RemoveNodeLog();
- log.setPartitionTable(TestUtils.seralizePartitionTable);
+ log.setPartitionTable(TestUtils.getSeralizePartitionTable());
log.setCurrLogIndex(2);
log.setCurrLogTerm(2);
log.setRemovedNode(TestUtils.getNode(0));
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
index 063747c..1c7ad73 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
@@ -39,7 +39,6 @@ import
org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
-
import org.apache.thrift.TConfiguration;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
@@ -157,7 +156,7 @@ public abstract class DataSnapshotTest {
throws TException {
if (addNetFailure && (failureCnt++) % failureFrequency == 0) {
// simulate failures
- throw new TException("Faked network failure");
+ throw new TException("[Ignore me in tests] Faked network
failure");
}
try {
return IOUtils.readFile(filePath, offset, length);
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
index a0c696c..07bd124 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
diff --git a/pom.xml b/pom.xml
index 3e312c2..f1c98e5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -385,7 +385,7 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
- <version>1.10.19</version>
+ <version>2.0.2-beta</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 63c057e..190edca 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.conf;
-import com.google.common.net.InetAddresses;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -29,6 +28,8 @@ import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.FSType;
+
+import com.google.common.net.InetAddresses;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
index 066338f..a5467ec 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.sync.sender.transfer;
-import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -41,6 +40,8 @@ import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.service.sync.thrift.ConfirmInfo;
import org.apache.iotdb.service.sync.thrift.SyncService;
import org.apache.iotdb.service.sync.thrift.SyncStatus;
+
+import org.apache.commons.io.FileUtils;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
diff --git
a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 6f0fcd1..7504860 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;