This is an automated email from the ASF dual-hosted git repository. hxd pushed a commit to branch testcontainer in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 009e6215a8146fb5af8c6bdf5768b81912f2bdd2 Author: chaow <[email protected]> AuthorDate: Mon Apr 12 09:28:29 2021 +0800 Optimize sync leader for meta (#2987) --- .../resources/conf/iotdb-cluster.properties | 4 +- .../apache/iotdb/cluster/config/ClusterConfig.java | 10 --- .../iotdb/cluster/config/ClusterDescriptor.java | 5 -- .../iotdb/cluster/server/DataClusterServer.java | 6 +- .../iotdb/cluster/server/MetaClusterServer.java | 6 +- .../iotdb/cluster/server/member/RaftMember.java | 77 +++++++++++++--------- .../cluster/server/service/BaseAsyncService.java | 19 +++++- .../cluster/server/service/BaseSyncService.java | 23 +++++-- .../query/ClusterDataQueryExecutorTest.java | 3 - .../cluster/server/member/DataGroupMemberTest.java | 5 +- .../cluster/server/member/RaftMemberTest.java | 9 +-- thrift-cluster/src/main/thrift/cluster.thrift | 8 ++- 12 files changed, 103 insertions(+), 72 deletions(-) diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties index 4126539..73e7b42 100644 --- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties +++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties @@ -170,6 +170,4 @@ max_client_pernode_permember_number=1000 # If the number of connections created for a node exceeds `max_client_pernode_permember_number`, # we need to wait so much time for other connections to be released until timeout, # or a new connection will be created. -wait_client_timeout_ms=5000 - -enable_query_redirect=false \ No newline at end of file +wait_client_timeout_ms=5000 \ No newline at end of file diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java index 3d1cd32..11cedc8 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java @@ -164,8 +164,6 @@ public class ClusterConfig { private boolean openServerRpcPort = false; - private boolean enableQueryRedirect = false; - public int getSelectorNumOfClientPool() { return selectorNumOfClientPool; } @@ -469,12 +467,4 @@ public class ClusterConfig { public void setWaitClientTimeoutMS(long waitClientTimeoutMS) { this.waitClientTimeoutMS = waitClientTimeoutMS; } - - public boolean isEnableQueryRedirect() { - return enableQueryRedirect; - } - - public void setEnableQueryRedirect(boolean enableQueryRedirect) { - this.enableQueryRedirect = enableQueryRedirect; - } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java index a2488f7..25f85e0 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java @@ -299,11 +299,6 @@ public class ClusterDescriptor { properties.getProperty( "wait_client_timeout_ms", String.valueOf(config.getWaitClientTimeoutMS())))); - config.setEnableQueryRedirect( - Boolean.parseBoolean( - properties.getProperty( - "enable_query_redirect", String.valueOf(config.isEnableQueryRedirect())))); - String consistencyLevel = properties.getProperty("consistency_level"); if (consistencyLevel != null) { config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel)); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java index a54de23..e4c81f8 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java @@ -46,6 +46,7 @@ import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest; import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp; import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest; import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp; +import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse; import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest; import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest; import org.apache.iotdb.cluster.rpc.thrift.TSDataService; @@ -307,7 +308,8 @@ public class DataClusterServer extends RaftServer } @Override - public void requestCommitIndex(Node header, AsyncMethodCallback<Long> resultHandler) { + public void requestCommitIndex( + Node header, AsyncMethodCallback<RequestCommitIndexResponse> resultHandler) { DataAsyncService service = getDataAsyncService(header, resultHandler, "Request commit index"); if (service != null) { service.requestCommitIndex(header, resultHandler); @@ -919,7 +921,7 @@ public class DataClusterServer extends RaftServer } @Override - public long requestCommitIndex(Node header) throws TException { + public RequestCommitIndexResponse requestCommitIndex(Node header) throws TException { return getDataSyncService(header).requestCommitIndex(header); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java index 02d53b3..12e286f 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java @@ -33,6 +33,7 @@ import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq; import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest; import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse; import org.apache.iotdb.cluster.rpc.thrift.Node; +import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse; import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest; import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus; import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus; @@ -224,7 +225,8 @@ public class MetaClusterServer extends RaftServer } @Override - public void requestCommitIndex(Node header, AsyncMethodCallback<Long> resultHandler) { + public void requestCommitIndex( + Node header, AsyncMethodCallback<RequestCommitIndexResponse> resultHandler) { asyncService.requestCommitIndex(header, resultHandler); } @@ -331,7 +333,7 @@ public class MetaClusterServer extends RaftServer } @Override - public long requestCommitIndex(Node header) throws TException { + public RequestCommitIndexResponse requestCommitIndex(Node header) throws TException { return syncService.requestCommitIndex(header); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java index 571424a..426c370 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java @@ -47,6 +47,7 @@ import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse; import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient; import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client; +import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse; import org.apache.iotdb.cluster.server.NodeCharacter; import org.apache.iotdb.cluster.server.RaftServer; import org.apache.iotdb.cluster.server.Response; @@ -406,30 +407,7 @@ public abstract class RaftMember { response.setLastLogTerm(logManager.getLastLogTerm()); } - if (logManager.getCommitLogIndex() < request.getCommitLogIndex()) { - // there are more local logs that can be committed, commit them in a ThreadPool so the - // heartbeat response will not be blocked - CommitLogTask commitLogTask = - new CommitLogTask( - logManager, request.getCommitLogIndex(), request.getCommitLogTerm()); - commitLogTask.registerCallback(new CommitLogCallback(this)); - // if the log is not consistent, the commitment will be blocked until the leader makes the - // node catch up - if (commitLogPool != null && !commitLogPool.isShutdown()) { - commitLogPool.submit(commitLogTask); - } - - logger.debug( - "{}: Inconsistent log found, leaderCommit: {}-{}, localCommit: {}-{}, " - + "localLast: {}-{}", - name, - request.getCommitLogIndex(), - request.getCommitLogTerm(), - logManager.getCommitLogIndex(), - logManager.getCommitLogTerm(), - logManager.getLastLogIndex(), - logManager.getLastLogTerm()); - } + tryUpdateCommitIndex(leaderTerm, request.getCommitLogIndex(), request.getCommitLogTerm()); if (logger.isTraceEnabled()) { logger.trace("{} received heartbeat from a valid leader {}", name, request.getLeader()); @@ -439,6 +417,31 @@ public abstract class RaftMember { } } + private void tryUpdateCommitIndex(long leaderTerm, long commitIndex, long commitTerm) { + if (leaderTerm >= term.get() && logManager.getCommitLogIndex() < commitIndex) { + // there are more local logs that can be committed, commit them in a ThreadPool so the + // heartbeat response will not be blocked + CommitLogTask commitLogTask = new CommitLogTask(logManager, commitIndex, commitTerm); + commitLogTask.registerCallback(new CommitLogCallback(this)); + // if the log is not consistent, the commitment will be blocked until the leader makes the + // node catch up + if (commitLogPool != null && !commitLogPool.isShutdown()) { + commitLogPool.submit(commitLogTask); + } + + logger.debug( + "{}: Inconsistent log found, leaderCommit: {}-{}, localCommit: {}-{}, " + + "localLast: {}-{}", + name, + commitIndex, + commitTerm, + logManager.getCommitLogIndex(), + logManager.getCommitLogTerm(), + logManager.getLastLogIndex(), + logManager.getLastLogTerm()); + } + } + /** * Process an ElectionRequest. If the request comes from the last leader, accept it. Else decide * whether to accept by examining the log status of the elector. @@ -872,8 +875,14 @@ public abstract class RaftMember { protected boolean waitUntilCatchUp(CheckConsistency checkConsistency) throws CheckConsistencyException { long leaderCommitId = Long.MIN_VALUE; + RequestCommitIndexResponse response; try { - leaderCommitId = config.isUseAsyncServer() ? requestCommitIdAsync() : requestCommitIdSync(); + response = config.isUseAsyncServer() ? requestCommitIdAsync() : requestCommitIdSync(); + leaderCommitId = response.getCommitLogIndex(); + + tryUpdateCommitIndex( + response.getTerm(), response.getCommitLogIndex(), response.getCommitLogTerm()); + return syncLocalApply(leaderCommitId); } catch (TException e) { logger.error(MSG_NO_LEADER_COMMIT_INDEX, name, leader.get(), e); @@ -1057,9 +1066,12 @@ public abstract class RaftMember { } @SuppressWarnings("java:S2274") // enable timeout - protected long requestCommitIdAsync() throws TException, InterruptedException { + protected RequestCommitIndexResponse requestCommitIdAsync() + throws TException, InterruptedException { // use Long.MAX_VALUE to indicate a timeout - AtomicReference<Long> commitIdResult = new AtomicReference<>(Long.MAX_VALUE); + RequestCommitIndexResponse response = + new RequestCommitIndexResponse(Long.MAX_VALUE, Long.MAX_VALUE, Long.MAX_VALUE); + AtomicReference<RequestCommitIndexResponse> commitIdResult = new AtomicReference<>(response); AsyncClient client = getAsyncClient(leader.get()); if (client == null) { // cannot connect to the leader @@ -1073,24 +1085,25 @@ public abstract class RaftMember { return commitIdResult.get(); } - private long requestCommitIdSync() throws TException { + private RequestCommitIndexResponse requestCommitIdSync() throws TException { Client client = getSyncClient(leader.get()); + RequestCommitIndexResponse response; if (client == null) { // cannot connect to the leader logger.warn(MSG_NO_LEADER_IN_SYNC, name); // use Long.MAX_VALUE to indicate a timeouts - return Long.MAX_VALUE; + response = new RequestCommitIndexResponse(Long.MAX_VALUE, Long.MAX_VALUE, Long.MAX_VALUE); + return response; } - long commitIndex; try { - commitIndex = client.requestCommitIndex(getHeader()); + response = client.requestCommitIndex(getHeader()); } catch (TException e) { client.getInputProtocol().getTransport().close(); throw e; } finally { ClientUtils.putBackSyncClient(client); } - return commitIndex; + return response; } /** diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java index 07dbdea..8673078 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java @@ -30,6 +30,7 @@ import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse; import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.RaftService; import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient; +import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse; import org.apache.iotdb.cluster.server.NodeCharacter; import org.apache.iotdb.cluster.server.member.RaftMember; import org.apache.iotdb.cluster.utils.IOUtils; @@ -85,10 +86,22 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface { } @Override - public void requestCommitIndex(Node header, AsyncMethodCallback<Long> resultHandler) { - long commitIndex = member.getCommitIndex(); + public void requestCommitIndex( + Node header, AsyncMethodCallback<RequestCommitIndexResponse> resultHandler) { + long commitIndex; + long commitTerm; + long curTerm; + synchronized (member.getTerm()) { + commitIndex = member.getLogManager().getCommitLogIndex(); + commitTerm = member.getLogManager().getCommitLogTerm(); + curTerm = member.getTerm().get(); + } + + RequestCommitIndexResponse response = + new RequestCommitIndexResponse(curTerm, commitIndex, commitTerm); + if (commitIndex != Long.MIN_VALUE) { - resultHandler.onComplete(commitIndex); + resultHandler.onComplete(response); return; } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java index 697f54e..ce200ab 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java @@ -30,6 +30,7 @@ import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse; import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.RaftService; import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client; +import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse; import org.apache.iotdb.cluster.server.NodeCharacter; import org.apache.iotdb.cluster.server.member.RaftMember; import org.apache.iotdb.cluster.utils.ClientUtils; @@ -93,10 +94,22 @@ public abstract class BaseSyncService implements RaftService.Iface { } @Override - public long requestCommitIndex(Node header) throws TException { - long commitIndex = member.getCommitIndex(); + public RequestCommitIndexResponse requestCommitIndex(Node header) throws TException { + + long commitIndex; + long commitTerm; + long curTerm; + synchronized (member.getTerm()) { + commitIndex = member.getLogManager().getCommitLogIndex(); + commitTerm = member.getLogManager().getCommitLogTerm(); + curTerm = member.getTerm().get(); + } + + RequestCommitIndexResponse response = + new RequestCommitIndexResponse(curTerm, commitIndex, commitTerm); + if (commitIndex != Long.MIN_VALUE) { - return commitIndex; + return response; } member.waitLeader(); @@ -105,14 +118,14 @@ public abstract class BaseSyncService implements RaftService.Iface { throw new TException(new LeaderUnknownException(member.getAllNodes())); } try { - commitIndex = client.requestCommitIndex(header); + response = client.requestCommitIndex(header); } catch (TException e) { client.getInputProtocol().getTransport().close(); throw e; } finally { ClientUtils.putBackSyncClient(client); } - return commitIndex; + return response; } @Override diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java index bf88b89..8b7e98d 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java @@ -20,7 +20,6 @@ package org.apache.iotdb.cluster.query; import org.apache.iotdb.cluster.common.TestUtils; -import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.exception.query.QueryProcessException; @@ -52,14 +51,12 @@ public class ClusterDataQueryExecutorTest extends BaseQueryTest { @Before public void setUp() throws Exception { super.setUp(); - ClusterDescriptor.getInstance().getConfig().setEnableQueryRedirect(true); } @Override @After public void tearDown() throws Exception { super.tearDown(); - ClusterDescriptor.getInstance().getConfig().setEnableQueryRedirect(false); } @Test diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java index 77e72b2..f6d5e5d 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java @@ -48,6 +48,7 @@ import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp; import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest; import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp; import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient; +import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse; import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest; import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest; import org.apache.iotdb.cluster.server.NodeCharacter; @@ -244,11 +245,11 @@ public class DataGroupMemberTest extends BaseMember { @Override public void requestCommitIndex( - Node header, AsyncMethodCallback<Long> resultHandler) { + Node header, AsyncMethodCallback<RequestCommitIndexResponse> resultHandler) { new Thread( () -> { if (enableSyncLeader) { - resultHandler.onComplete(-1L); + resultHandler.onComplete(new RequestCommitIndexResponse()); } else { resultHandler.onError(new TestException()); } diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/RaftMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/RaftMemberTest.java index 494f845..694c4aa 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/RaftMemberTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/RaftMemberTest.java @@ -29,6 +29,7 @@ import org.apache.iotdb.cluster.log.manage.PartitionedSnapshotLogManager; import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest; import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.RaftService; +import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse; import org.apache.iotdb.cluster.server.NodeCharacter; import org.apache.iotdb.cluster.server.Response; @@ -178,8 +179,8 @@ public class RaftMemberTest extends BaseMember { } @Override - protected long requestCommitIdAsync() { - return 5; + protected RequestCommitIndexResponse requestCommitIdAsync() { + return new RequestCommitIndexResponse(5, 5, 5); } @Override @@ -215,8 +216,8 @@ public class RaftMemberTest extends BaseMember { } @Override - protected long requestCommitIdAsync() { - return 1000L; + protected RequestCommitIndexResponse requestCommitIdAsync() { + return new RequestCommitIndexResponse(1000, 1000, 1000); } @Override diff --git a/thrift-cluster/src/main/thrift/cluster.thrift b/thrift-cluster/src/main/thrift/cluster.thrift index c8edbe3..f23130e 100644 --- a/thrift-cluster/src/main/thrift/cluster.thrift +++ b/thrift-cluster/src/main/thrift/cluster.thrift @@ -58,6 +58,12 @@ struct HeartBeatResponse { 7: optional Node header } +struct RequestCommitIndexResponse { + 1: required long term // leader's meta log + 2: required long commitLogIndex // leader's meta log + 3: required long commitLogTerm +} + // node -> node struct ElectionRequest { 1: required long term @@ -311,7 +317,7 @@ service RaftService { * Ask the leader for its commit index, used to check whether the node has caught up with the * leader. **/ - long requestCommitIndex(1:Node header) + RequestCommitIndexResponse requestCommitIndex(1:Node header) /**
