This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch test_wal_sync_pr in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5f4b1dc74c663a65930ee5fd1d43ca9670c9b2fc Author: Jinrui.Zhang <[email protected]> AuthorDate: Tue Jun 28 17:57:18 2022 +0800 remove logs --- .../multileader/MultiLeaderServerImpl.java | 4 -- .../multileader/logdispatcher/LogDispatcher.java | 70 +++++++--------------- .../multileader/wal/ConsensusReqReader.java | 3 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- .../mpp/plan/analyze/ClusterPartitionFetcher.java | 1 - .../config/executor/ClusterConfigTaskExecutor.java | 5 -- .../org/apache/iotdb/db/wal/buffer/WALBuffer.java | 1 - .../java/org/apache/iotdb/db/wal/node/WALNode.java | 8 +-- 8 files changed, 29 insertions(+), 65 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java index 7682f118a8..272c244a1e 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java @@ -104,10 +104,6 @@ public class MultiLeaderServerImpl { synchronized (stateMachine) { IndexedConsensusRequest indexedConsensusRequest = buildIndexedConsensusRequestForLocalRequest(request); - logger.info( - "index after build: safeIndex: {}, searchIndex: {}", - indexedConsensusRequest.getSafelyDeletedSearchIndex(), - indexedConsensusRequest.getSearchIndex()); TSStatus result = stateMachine.write(indexedConsensusRequest); logDispatcher.offer(indexedConsensusRequest); return result; diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java index 5af75547a3..9b528fa533 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java @@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.consensus.common.Peer; -import org.apache.iotdb.consensus.common.request.IConsensusRequest; import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest; import org.apache.iotdb.consensus.config.MultiLeaderConfig; import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl; @@ -212,15 +211,9 @@ public class LogDispatcher { List<TLogBatch> logBatches = new ArrayList<>(); long startIndex = syncStatus.getNextSendingIndex(); long maxIndex = impl.getController().getCurrentIndex() + 1; - logger.info("get batch. startIndex: {}, maxIndex: {}", startIndex, maxIndex); long endIndex; if (bufferedRequest.size() <= config.getReplication().getMaxRequestPerBatch()) { // Use drainTo instead of poll to reduce lock overhead - logger.info( - "{} : pendingRequest Size: {}, bufferedRequest size: {}", - impl.getThisNode().getGroupId(), - pendingRequest.size(), - bufferedRequest.size()); pendingRequest.drainTo( bufferedRequest, config.getReplication().getMaxRequestPerBatch() - bufferedRequest.size()); @@ -229,7 +222,8 @@ public class LogDispatcher { // only execute this after a restart endIndex = constructBatchFromWAL(startIndex, maxIndex, logBatches); batch = new PendingBatch(startIndex, endIndex, logBatches); - logger.info("Empty {} : accumulated a {} from wal", impl.getThisNode().getGroupId(), batch); + logger.debug( + "{} : accumulated a {} from wal when empty", impl.getThisNode().getGroupId(), batch); } else { Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator(); IndexedConsensusRequest prev = iterator.next(); @@ -238,7 +232,7 @@ public class LogDispatcher { endIndex = constructBatchFromWAL(startIndex, prev.getSearchIndex(), logBatches); if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) { batch = new PendingBatch(startIndex, endIndex, logBatches); - logger.info("{} : accumulated a {} from wal", impl.getThisNode().getGroupId(), batch); + logger.debug("{} : accumulated a {} from wal", impl.getThisNode().getGroupId(), batch); return batch; } constructBatchIndexedFromConsensusRequest(prev, logBatches); @@ -254,8 +248,8 @@ public class LogDispatcher { constructBatchFromWAL(prev.getSearchIndex(), current.getSearchIndex(), logBatches); if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) { batch = new PendingBatch(startIndex, endIndex, logBatches); - logger.info( - "gap {} : accumulated a {} from queue and wal", + logger.debug( + "{} : accumulated a {} from queue and wal when gap", impl.getThisNode().getGroupId(), batch); return batch; @@ -293,45 +287,25 @@ public class LogDispatcher { private long constructBatchFromWAL( long currentIndex, long maxIndex, List<TLogBatch> logBatches) { - logger.info( - String.format( - "DataRegion[%s]->%s: currentIndex: %d, maxIndex: %d, iteratorIndex: %d", - peer.getGroupId().getId(), - peer.getEndpoint().ip, - currentIndex, - maxIndex, - iteratorIndex)); - long startTime = System.nanoTime(); - int count = 0; - try { - if (iteratorIndex != currentIndex) { - walEntryiterator.skipTo(currentIndex); - iteratorIndex = currentIndex; - } - while (currentIndex < maxIndex - && logBatches.size() < config.getReplication().getMaxRequestPerBatch()) { - try { - walEntryiterator.waitForNextReady(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - // TODO iterator - IConsensusRequest data = walEntryiterator.next(); - iteratorIndex++; - currentIndex++; - if (data != null) { - logBatches.add(new TLogBatch(data.serializeToByteBuffer())); - count++; - } + + if (iteratorIndex != currentIndex) { + walEntryiterator.skipTo(currentIndex); + iteratorIndex = currentIndex; + } + while (currentIndex < maxIndex + && logBatches.size() < config.getReplication().getMaxRequestPerBatch()) { + try { + walEntryiterator.waitForNextReady(); + } catch (InterruptedException e) { + e.printStackTrace(); } - return currentIndex - 1; - } finally { - double timeConsumed = (System.nanoTime() * 1.0 - startTime) / 1000_000; - logger.info( - String.format( - "DataRegion[%s]->%s: construct batch time consumed: %.3f. BatchCount: %d", - peer.getGroupId().getId(), peer.getEndpoint().ip, timeConsumed, count)); + // TODO iterator + IndexedConsensusRequest data = walEntryiterator.next(); + currentIndex = data.getSearchIndex(); + iteratorIndex = currentIndex; + logBatches.add(new TLogBatch(data.serializeToByteBuffer())); } + return currentIndex - 1; } private void constructBatchIndexedFromConsensusRequest( diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java index 5880e5de2d..b969c9c736 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java @@ -20,6 +20,7 @@ package org.apache.iotdb.consensus.multileader.wal; import org.apache.iotdb.consensus.common.request.IConsensusRequest; +import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest; import java.util.Iterator; import java.util.List; @@ -68,7 +69,7 @@ public interface ConsensusReqReader { * @throws java.util.NoSuchElementException if the iteration has no more elements, wait a moment * or call {@link this#waitForNextReady} for more elements */ - IConsensusRequest next(); + IndexedConsensusRequest next(); /** * Wait for the next element in the iteration ready, blocked until next element is available. diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 26ba351a27..d27da1e114 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -896,7 +896,7 @@ public class IoTDBConfig { * Cache size of partition cache in {@link * org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher} */ - private int partitionCacheSize = 0; + private int partitionCacheSize = 100000; /** Cache size of user and role */ private int authorCacheSize = 100; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java index edade58e54..ebfed56211 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java @@ -278,7 +278,6 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) { TStorageGroupSchemaResp storageGroupSchemaResp = client.getMatchedStorageGroupSchemas(ROOT_PATH); - logger.info("fetch sg. target: {}", storageGroupSchemaResp.storageGroupSchemaMap.keySet()); if (storageGroupSchemaResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { Set<String> storageGroupNames = diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 296580ad9a..93c08bfb40 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -88,7 +88,6 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { @Override public SettableFuture<ConfigTaskResult> setStorageGroup( SetStorageGroupStatement setStorageGroupStatement) { - LOGGER.info("set storage group task. {}", setStorageGroupStatement.getStorageGroupPath()); SettableFuture<ConfigTaskResult> future = SettableFuture.create(); // Construct request using statement TStorageGroupSchema storageGroupSchema = @@ -108,10 +107,6 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - LOGGER.info( - "set storage group task. {}. Status: {}", - setStorageGroupStatement.getStorageGroupPath(), - tsStatus.code); } catch (TException | IOException e) { future.setException(e); } diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java index c35808e869..6626ce7f91 100644 --- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java +++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java @@ -223,7 +223,6 @@ public class WALBuffer extends AbstractWALBuffer { if (insertNode.getSearchIndex() != InsertNode.NO_CONSENSUS_INDEX) { currentSearchIndex = insertNode.getSearchIndex(); currentFileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX; - logger.info("WALEntry searchIndex: {}", currentSearchIndex); } } return true; diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java index 2e890521f5..6862c6aeed 100644 --- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java +++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.file.SystemFileFactory; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.consensus.common.request.IConsensusRequest; +import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.StorageEngine; @@ -237,7 +238,7 @@ public class WALNode implements IWALNode { } } - logger.info( + logger.debug( "Start deleting outdated wal files for wal node-{}, the first valid version id is {}, and the safely deleted search index is {}.", identifier, firstValidVersionId, @@ -759,7 +760,7 @@ public class WALNode implements IWALNode { } @Override - public IConsensusRequest next() { + public IndexedConsensusRequest next() { if (itr == null && !hasNext()) { throw new NoSuchElementException(); } @@ -774,14 +775,13 @@ public class WALNode implements IWALNode { } nextSearchIndex = insertNode.getSearchIndex() + 1; - return insertNode; + return new IndexedConsensusRequest(insertNode.getSearchIndex(), -1, insertNode); } @Override public void waitForNextReady() throws InterruptedException { while (!hasNext()) { buffer.waitForFlush(); - logger.info("awake from waiting. nextSearchIndex: {}", nextSearchIndex); } }
