This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch test_wal_sync_pr
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5f4b1dc74c663a65930ee5fd1d43ca9670c9b2fc
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Tue Jun 28 17:57:18 2022 +0800

    remove logs
---
 .../multileader/MultiLeaderServerImpl.java         |  4 --
 .../multileader/logdispatcher/LogDispatcher.java   | 70 +++++++---------------
 .../multileader/wal/ConsensusReqReader.java        |  3 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../mpp/plan/analyze/ClusterPartitionFetcher.java  |  1 -
 .../config/executor/ClusterConfigTaskExecutor.java |  5 --
 .../org/apache/iotdb/db/wal/buffer/WALBuffer.java  |  1 -
 .../java/org/apache/iotdb/db/wal/node/WALNode.java |  8 +--
 8 files changed, 29 insertions(+), 65 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 7682f118a8..272c244a1e 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -104,10 +104,6 @@ public class MultiLeaderServerImpl {
     synchronized (stateMachine) {
       IndexedConsensusRequest indexedConsensusRequest =
           buildIndexedConsensusRequestForLocalRequest(request);
-      logger.info(
-          "index after build: safeIndex: {}, searchIndex: {}",
-          indexedConsensusRequest.getSafelyDeletedSearchIndex(),
-          indexedConsensusRequest.getSearchIndex());
       TSStatus result = stateMachine.write(indexedConsensusRequest);
       logDispatcher.offer(indexedConsensusRequest);
       return result;
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 5af75547a3..9b528fa533 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.config.MultiLeaderConfig;
 import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
@@ -212,15 +211,9 @@ public class LogDispatcher {
       List<TLogBatch> logBatches = new ArrayList<>();
       long startIndex = syncStatus.getNextSendingIndex();
       long maxIndex = impl.getController().getCurrentIndex() + 1;
-      logger.info("get batch. startIndex: {}, maxIndex: {}", startIndex, 
maxIndex);
       long endIndex;
       if (bufferedRequest.size() <= 
config.getReplication().getMaxRequestPerBatch()) {
         // Use drainTo instead of poll to reduce lock overhead
-        logger.info(
-            "{} : pendingRequest Size: {}, bufferedRequest size: {}",
-            impl.getThisNode().getGroupId(),
-            pendingRequest.size(),
-            bufferedRequest.size());
         pendingRequest.drainTo(
             bufferedRequest,
             config.getReplication().getMaxRequestPerBatch() - 
bufferedRequest.size());
@@ -229,7 +222,8 @@ public class LogDispatcher {
         // only execute this after a restart
         endIndex = constructBatchFromWAL(startIndex, maxIndex, logBatches);
         batch = new PendingBatch(startIndex, endIndex, logBatches);
-        logger.info("Empty {} : accumulated a {} from wal", 
impl.getThisNode().getGroupId(), batch);
+        logger.debug(
+            "{} : accumulated a {} from wal when empty", 
impl.getThisNode().getGroupId(), batch);
       } else {
         Iterator<IndexedConsensusRequest> iterator = 
bufferedRequest.iterator();
         IndexedConsensusRequest prev = iterator.next();
@@ -238,7 +232,7 @@ public class LogDispatcher {
         endIndex = constructBatchFromWAL(startIndex, prev.getSearchIndex(), 
logBatches);
         if (logBatches.size() == 
config.getReplication().getMaxRequestPerBatch()) {
           batch = new PendingBatch(startIndex, endIndex, logBatches);
-          logger.info("{} : accumulated a {} from wal", 
impl.getThisNode().getGroupId(), batch);
+          logger.debug("{} : accumulated a {} from wal", 
impl.getThisNode().getGroupId(), batch);
           return batch;
         }
         constructBatchIndexedFromConsensusRequest(prev, logBatches);
@@ -254,8 +248,8 @@ public class LogDispatcher {
                 constructBatchFromWAL(prev.getSearchIndex(), 
current.getSearchIndex(), logBatches);
             if (logBatches.size() == 
config.getReplication().getMaxRequestPerBatch()) {
               batch = new PendingBatch(startIndex, endIndex, logBatches);
-              logger.info(
-                  "gap {} : accumulated a {} from queue and wal",
+              logger.debug(
+                  "{} : accumulated a {} from queue and wal when gap",
                   impl.getThisNode().getGroupId(),
                   batch);
               return batch;
@@ -293,45 +287,25 @@ public class LogDispatcher {
 
     private long constructBatchFromWAL(
         long currentIndex, long maxIndex, List<TLogBatch> logBatches) {
-      logger.info(
-          String.format(
-              "DataRegion[%s]->%s: currentIndex: %d, maxIndex: %d, 
iteratorIndex: %d",
-              peer.getGroupId().getId(),
-              peer.getEndpoint().ip,
-              currentIndex,
-              maxIndex,
-              iteratorIndex));
-      long startTime = System.nanoTime();
-      int count = 0;
-      try {
-        if (iteratorIndex != currentIndex) {
-          walEntryiterator.skipTo(currentIndex);
-          iteratorIndex = currentIndex;
-        }
-        while (currentIndex < maxIndex
-            && logBatches.size() < 
config.getReplication().getMaxRequestPerBatch()) {
-          try {
-            walEntryiterator.waitForNextReady();
-          } catch (InterruptedException e) {
-            e.printStackTrace();
-          }
-          // TODO iterator
-          IConsensusRequest data = walEntryiterator.next();
-          iteratorIndex++;
-          currentIndex++;
-          if (data != null) {
-            logBatches.add(new TLogBatch(data.serializeToByteBuffer()));
-            count++;
-          }
+
+      if (iteratorIndex != currentIndex) {
+        walEntryiterator.skipTo(currentIndex);
+        iteratorIndex = currentIndex;
+      }
+      while (currentIndex < maxIndex
+          && logBatches.size() < 
config.getReplication().getMaxRequestPerBatch()) {
+        try {
+          walEntryiterator.waitForNextReady();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
         }
-        return currentIndex - 1;
-      } finally {
-        double timeConsumed = (System.nanoTime() * 1.0 - startTime) / 1000_000;
-        logger.info(
-            String.format(
-                "DataRegion[%s]->%s: construct batch time consumed: %.3f. 
BatchCount: %d",
-                peer.getGroupId().getId(), peer.getEndpoint().ip, 
timeConsumed, count));
+        // TODO iterator
+        IndexedConsensusRequest data = walEntryiterator.next();
+        currentIndex = data.getSearchIndex();
+        iteratorIndex = currentIndex;
+        logBatches.add(new TLogBatch(data.serializeToByteBuffer()));
       }
+      return currentIndex - 1;
     }
 
     private void constructBatchIndexedFromConsensusRequest(
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
index 5880e5de2d..b969c9c736 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.consensus.multileader.wal;
 
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 
 import java.util.Iterator;
 import java.util.List;
@@ -68,7 +69,7 @@ public interface ConsensusReqReader {
      * @throws java.util.NoSuchElementException if the iteration has no more 
elements, wait a moment
      *     or call {@link this#waitForNextReady} for more elements
      */
-    IConsensusRequest next();
+    IndexedConsensusRequest next();
 
     /**
      * Wait for the next element in the iteration ready, blocked until next 
element is available.
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 26ba351a27..d27da1e114 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -896,7 +896,7 @@ public class IoTDBConfig {
    * Cache size of partition cache in {@link
    * org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher}
    */
-  private int partitionCacheSize = 0;
+  private int partitionCacheSize = 100000;
 
   /** Cache size of user and role */
   private int authorCacheSize = 100;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
index edade58e54..ebfed56211 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
@@ -278,7 +278,6 @@ public class ClusterPartitionFetcher implements 
IPartitionFetcher {
           
configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
         TStorageGroupSchemaResp storageGroupSchemaResp =
             client.getMatchedStorageGroupSchemas(ROOT_PATH);
-        logger.info("fetch sg. target: {}", 
storageGroupSchemaResp.storageGroupSchemaMap.keySet());
         if (storageGroupSchemaResp.getStatus().getCode()
             == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           Set<String> storageGroupNames =
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 296580ad9a..93c08bfb40 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -88,7 +88,6 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
   @Override
   public SettableFuture<ConfigTaskResult> setStorageGroup(
       SetStorageGroupStatement setStorageGroupStatement) {
-    LOGGER.info("set storage group task. {}", 
setStorageGroupStatement.getStorageGroupPath());
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
     // Construct request using statement
     TStorageGroupSchema storageGroupSchema =
@@ -108,10 +107,6 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
       } else {
         future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
       }
-      LOGGER.info(
-          "set storage group task. {}. Status: {}",
-          setStorageGroupStatement.getStorageGroupPath(),
-          tsStatus.code);
     } catch (TException | IOException e) {
       future.setException(e);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java 
b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
index c35808e869..6626ce7f91 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
@@ -223,7 +223,6 @@ public class WALBuffer extends AbstractWALBuffer {
         if (insertNode.getSearchIndex() != InsertNode.NO_CONSENSUS_INDEX) {
           currentSearchIndex = insertNode.getSearchIndex();
           currentFileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
-          logger.info("WALEntry searchIndex: {}", currentSearchIndex);
         }
       }
       return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java 
b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 2e890521f5..6862c6aeed 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
@@ -237,7 +238,7 @@ public class WALNode implements IWALNode {
         }
       }
 
-      logger.info(
+      logger.debug(
           "Start deleting outdated wal files for wal node-{}, the first valid 
version id is {}, and the safely deleted search index is {}.",
           identifier,
           firstValidVersionId,
@@ -759,7 +760,7 @@ public class WALNode implements IWALNode {
     }
 
     @Override
-    public IConsensusRequest next() {
+    public IndexedConsensusRequest next() {
       if (itr == null && !hasNext()) {
         throw new NoSuchElementException();
       }
@@ -774,14 +775,13 @@ public class WALNode implements IWALNode {
       }
       nextSearchIndex = insertNode.getSearchIndex() + 1;
 
-      return insertNode;
+      return new IndexedConsensusRequest(insertNode.getSearchIndex(), -1, 
insertNode);
     }
 
     @Override
     public void waitForNextReady() throws InterruptedException {
       while (!hasNext()) {
         buffer.waitForFlush();
-        logger.info("awake from waiting. nextSearchIndex: {}", 
nextSearchIndex);
       }
     }
 

Reply via email to