This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch jira5400_cp in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4c6bfa96cddfd20eaef4491ddb74c3c03a37d317 Author: Potato <[email protected]> AuthorDate: Thu Jan 12 16:31:50 2023 +0800 [IOTDB-5400] IoTConsensus loses data when replica number change from 1 to several (#8837) --- .../apache/iotdb/consensus/iot/IoTConsensus.java | 1 + .../consensus/iot/IoTConsensusServerImpl.java | 32 ++++++++++++++-------- .../consensus/iot/IoTConsensusServerMetrics.java | 2 +- .../consensus/iot/logdispatcher/LogDispatcher.java | 2 +- .../service/IoTConsensusRPCServiceProcessor.java | 2 +- .../apache/iotdb/consensus/iot/ReplicateTest.java | 32 +++++++++++----------- 6 files changed, 41 insertions(+), 30 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java index 5135d384a2..5cfe53459d 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java @@ -262,6 +262,7 @@ public class IoTConsensus implements IConsensus { // step 2: notify all the other Peers to build the sync connection to newPeer logger.info("[IoTConsensus] notify current peers to build sync log..."); + impl.checkAndLockSafeDeletedSearchIndex(); impl.notifyPeersToBuildSyncLogChannel(peer); // step 3: take snapshot diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index 1d668c281a..298a8caa12 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java @@ -98,7 +98,7 @@ public class IoTConsensusServerImpl { private final Condition stateMachineCondition = stateMachineLock.newCondition(); private final String storageDir; private final List<Peer> configuration; - private final AtomicLong index; + private final AtomicLong searchIndex; private final LogDispatcher logDispatcher; private final IoTConsensusConfig config; private final ConsensusReqReader reader; @@ -136,7 +136,7 @@ public class IoTConsensusServerImpl { // only one configuration means single replica. reader.setSafelyDeletedSearchIndex(Long.MAX_VALUE); } - this.index = new AtomicLong(currentSearchIndex); + this.searchIndex = new AtomicLong(currentSearchIndex); this.consensusGroupId = thisNode.getGroupId().toString(); this.metrics = new IoTConsensusServerMetrics(this); } @@ -180,7 +180,7 @@ public class IoTConsensusServerImpl { if (needBlockWrite()) { logger.info( "[Throttle Down] index:{}, safeIndex:{}", - getIndex(), + getSearchIndex(), getCurrentSafelyDeletedSearchIndex()); try { boolean timeout = @@ -240,9 +240,9 @@ public class IoTConsensusServerImpl { // is not expected and will slow down the preparation speed for batch. // So we need to use the lock to ensure the `offer()` and `incrementAndGet()` are // in one transaction. - synchronized (index) { + synchronized (searchIndex) { logDispatcher.offer(indexedConsensusRequest); - index.incrementAndGet(); + searchIndex.incrementAndGet(); } // statistic the time of offering request into queue MetricService.getInstance() @@ -456,7 +456,7 @@ public class IoTConsensusServerImpl { if (peer.equals(thisNode)) { // use searchIndex for thisNode as the initialSyncIndex because targetPeer will load the // snapshot produced by thisNode - buildSyncLogChannel(targetPeer, index.get()); + buildSyncLogChannel(targetPeer, searchIndex.get()); } else { // use RPC to tell other peers to build sync log channel to target peer try (SyncIoTConsensusServiceClient client = @@ -668,7 +668,7 @@ public class IoTConsensusServerImpl { public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest( IConsensusRequest request) { - return new IndexedConsensusRequest(index.get() + 1, Collections.singletonList(request)); + return new IndexedConsensusRequest(searchIndex.get() + 1, Collections.singletonList(request)); } public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest( @@ -682,7 +682,7 @@ public class IoTConsensusServerImpl { * single copies, the current index is selected */ public long getCurrentSafelyDeletedSearchIndex() { - return logDispatcher.getMinSyncIndex().orElseGet(index::get); + return logDispatcher.getMinSyncIndex().orElseGet(searchIndex::get); } public String getStorageDir() { @@ -697,8 +697,8 @@ public class IoTConsensusServerImpl { return configuration; } - public long getIndex() { - return index.get(); + public long getSearchIndex() { + return searchIndex.get(); } public IoTConsensusConfig getConfig() { @@ -723,7 +723,7 @@ public class IoTConsensusServerImpl { } public AtomicLong getIndexObject() { - return index; + return searchIndex; } public boolean isReadOnly() { @@ -768,4 +768,14 @@ public class IoTConsensusServerImpl { } } } + + /** + * We should set safelyDeletedSearchIndex to searchIndex before addPeer to avoid potential data + * lost. + */ + public void checkAndLockSafeDeletedSearchIndex() { + if (configuration.size() == 1) { + reader.setSafelyDeletedSearchIndex(searchIndex.get()); + } + } } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java index 88b25486b7..1e36e20b70 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java @@ -41,7 +41,7 @@ public class IoTConsensusServerMetrics implements IMetricSet { Metric.IOT_CONSENSUS.toString(), MetricLevel.IMPORTANT, impl, - IoTConsensusServerImpl::getIndex, + IoTConsensusServerImpl::getSearchIndex, Tag.NAME.toString(), "ioTConsensusServerImpl", Tag.REGION.toString(), diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java index 752a392ed1..93de711b76 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java @@ -344,7 +344,7 @@ public class LogDispatcher { long startIndex = syncStatus.getNextSendingIndex(); long maxIndex; synchronized (impl.getIndexObject()) { - maxIndex = impl.getIndex() + 1; + maxIndex = impl.getSearchIndex() + 1; logger.debug( "{}: startIndex: {}, maxIndex: {}, pendingEntries size: {}, bufferedEntries size: {}", impl.getThisNode().getGroupId(), diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java index f966528cdc..6b24f596a2 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java @@ -236,7 +236,7 @@ public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Asy resultHandler.onComplete(new TWaitSyncLogCompleteRes(true, 0, 0)); return; } - long searchIndex = impl.getIndex(); + long searchIndex = impl.getSearchIndex(); long safeIndex = impl.getCurrentSafelyDeletedSearchIndex(); resultHandler.onComplete( new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex, safeIndex)); diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java index e1675e7994..a36db6e3ea 100644 --- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java +++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java @@ -120,17 +120,17 @@ public class ReplicateTest { servers.get(1).createPeer(group.getGroupId(), group.getPeers()); servers.get(2).createPeer(group.getGroupId(), group.getPeers()); - Assert.assertEquals(0, servers.get(0).getImpl(gid).getIndex()); - Assert.assertEquals(0, servers.get(1).getImpl(gid).getIndex()); - Assert.assertEquals(0, servers.get(2).getImpl(gid).getIndex()); + Assert.assertEquals(0, servers.get(0).getImpl(gid).getSearchIndex()); + Assert.assertEquals(0, servers.get(1).getImpl(gid).getSearchIndex()); + Assert.assertEquals(0, servers.get(2).getImpl(gid).getSearchIndex()); for (int i = 0; i < CHECK_POINT_GAP; i++) { servers.get(0).write(gid, new TestEntry(i, peers.get(0))); servers.get(1).write(gid, new TestEntry(i, peers.get(1))); servers.get(2).write(gid, new TestEntry(i, peers.get(2))); - Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getIndex()); - Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getIndex()); - Assert.assertEquals(i + 1, servers.get(2).getImpl(gid).getIndex()); + Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getSearchIndex()); + Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getSearchIndex()); + Assert.assertEquals(i + 1, servers.get(2).getImpl(gid).getSearchIndex()); } for (int i = 0; i < 3; i++) { @@ -163,9 +163,9 @@ public class ReplicateTest { Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration()); Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration()); - Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getIndex()); - Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getIndex()); - Assert.assertEquals(CHECK_POINT_GAP, servers.get(2).getImpl(gid).getIndex()); + Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getSearchIndex()); + Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getSearchIndex()); + Assert.assertEquals(CHECK_POINT_GAP, servers.get(2).getImpl(gid).getSearchIndex()); for (int i = 0; i < 3; i++) { long start = System.currentTimeMillis(); @@ -197,14 +197,14 @@ public class ReplicateTest { servers.get(0).createPeer(group.getGroupId(), group.getPeers()); servers.get(1).createPeer(group.getGroupId(), group.getPeers()); - Assert.assertEquals(0, servers.get(0).getImpl(gid).getIndex()); - Assert.assertEquals(0, servers.get(1).getImpl(gid).getIndex()); + Assert.assertEquals(0, servers.get(0).getImpl(gid).getSearchIndex()); + Assert.assertEquals(0, servers.get(1).getImpl(gid).getSearchIndex()); for (int i = 0; i < CHECK_POINT_GAP; i++) { servers.get(0).write(gid, new TestEntry(i, peers.get(0))); servers.get(1).write(gid, new TestEntry(i, peers.get(1))); - Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getIndex()); - Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getIndex()); + Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getSearchIndex()); + Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getSearchIndex()); } Assert.assertEquals(0, servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex()); @@ -219,9 +219,9 @@ public class ReplicateTest { Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration()); Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration()); - Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getIndex()); - Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getIndex()); - Assert.assertEquals(0, servers.get(2).getImpl(gid).getIndex()); + Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getSearchIndex()); + Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getSearchIndex()); + Assert.assertEquals(0, servers.get(2).getImpl(gid).getSearchIndex()); for (int i = 0; i < 2; i++) { long start = System.currentTimeMillis();
