This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new cc4dc0a9ed0 Pipe: Reduced pipe logs and controlled how often the log
is printed below a certain frequency (#11973)
cc4dc0a9ed0 is described below
commit cc4dc0a9ed0031a93cb24320ba503a5f2873938f
Author: Caideyipi <[email protected]>
AuthorDate: Mon Feb 19 19:43:08 2024 +0800
Pipe: Reduced pipe logs and controlled how often the log is printed below a
certain frequency (#11973)
- Reduced meta report & tsFile pin & wal pin logs according to log scales
- Reduced connector event messages to core level
- Downgraded the retry queue logs in async connector
---------
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../db/pipe/agent/task/PipeTaskDataNodeAgent.java | 21 ++++++++-
.../thrift/async/IoTDBThriftAsyncConnector.java | 9 ++--
.../PipeTransferTabletBatchEventHandler.java | 9 +++-
.../PipeTransferTabletInsertionEventHandler.java | 4 +-
.../apache/iotdb/db/pipe/event/EnrichedEvent.java | 34 ++++++++++---
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 9 ++++
.../common/tablet/PipeRawTabletInsertionEvent.java | 13 +++++
.../db/pipe/resource/PipeResourceManager.java | 7 +++
.../iotdb/db/pipe/resource/log/PipeLogManager.java | 40 ++++++++++++++++
.../iotdb/db/pipe/resource/log/PipeLogStatus.java | 52 ++++++++++++++++++++
.../db/pipe/resource/memory/PipeMemoryBlock.java | 5 ++
.../resource/tsfile/PipeTsFileResourceManager.java | 20 ++++++--
.../pipe/resource/wal/PipeWALResourceManager.java | 23 +++++++--
.../subtask/connector/PipeConnectorSubtask.java | 12 +++--
.../dataregion/wal/utils/WALEntryHandler.java | 5 +-
.../apache/iotdb/commons/conf/CommonConfig.java | 55 ++++++++++++++++++++++
.../iotdb/commons/conf/CommonDescriptor.java | 31 ++++++++++++
.../iotdb/commons/pipe/config/PipeConfig.java | 33 +++++++++++++
.../commons/pipe/task/meta/PipeMetaKeeper.java | 4 ++
19 files changed, 357 insertions(+), 29 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
index 4f7dde606c8..299814b57af 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
@@ -60,6 +60,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -276,10 +277,18 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent {
final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
try {
+ final Optional<Logger> logger =
+ PipeResourceManager.log()
+ .schedule(
+ PipeTaskDataNodeAgent.class,
+
PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
+
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
+ pipeMetaKeeper.getPipeMetaCount());
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
pipeMetaBinaryList.add(pipeMeta.serialize());
- LOGGER.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage());
+ logger.ifPresent(l -> l.info("Reporting pipe meta: {}",
pipeMeta.coreReportMessage()));
}
+ LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
} catch (IOException e) {
throw new TException(e);
}
@@ -306,10 +315,18 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent {
final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
try {
+ final Optional<Logger> logger =
+ PipeResourceManager.log()
+ .schedule(
+ PipeTaskDataNodeAgent.class,
+
PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
+
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
+ pipeMetaKeeper.getPipeMetaCount());
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
pipeMetaBinaryList.add(pipeMeta.serialize());
- LOGGER.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage());
+ logger.ifPresent(l -> l.info("Reporting pipe meta: {}",
pipeMeta.coreReportMessage()));
}
+ LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
} catch (IOException e) {
throw new TException(e);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index b182e35adcd..f7d37a343ff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -330,8 +330,8 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
peekedEvent,
polledEvent);
}
- if (polledEvent != null) {
- LOGGER.info("Polled event {} from retry queue.", polledEvent);
+ if (polledEvent != null && LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Polled event {} from retry queue.", polledEvent);
}
}
}
@@ -354,8 +354,9 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
*/
public synchronized void addFailureEventToRetryQueue(Event event) {
retryEventQueue.offer(event);
-
- LOGGER.info("Added event {} to retry queue.", event);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Added event {} to retry queue.", event);
+ }
}
//////////////////////////// Operations for close
////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 5e9ddf66ca1..1100ed7f697 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
+import java.util.stream.Collectors;
public class PipeTransferTabletBatchEventHandler implements
AsyncMethodCallback<TPipeTransferResp> {
@@ -87,7 +88,13 @@ public class PipeTransferTabletBatchEventHandler implements
AsyncMethodCallback<
public void onError(Exception exception) {
LOGGER.warn(
"Failed to transfer TabletInsertionEvent batch {} (request commit
ids={}).",
- events,
+ events.stream()
+ .map(
+ event ->
+ event instanceof EnrichedEvent
+ ? ((EnrichedEvent) event).coreReportMessage()
+ : event.toString())
+ .collect(Collectors.toList()),
requestCommitIds,
exception);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index 6a7f79a3162..9f026c3d915 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -92,7 +92,9 @@ public abstract class
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
public void onError(Exception exception) {
LOGGER.warn(
"Failed to transfer TabletInsertionEvent {} (committer key={}, commit
id={}).",
- event,
+ event instanceof EnrichedEvent
+ ? ((EnrichedEvent) event).coreReportMessage()
+ : event.toString(),
event instanceof EnrichedEvent ? ((EnrichedEvent)
event).getCommitterKey() : null,
event instanceof EnrichedEvent ? ((EnrichedEvent) event).getCommitId()
: null,
exception);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index 894c5eae035..6b83e0023d8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -257,18 +257,40 @@ public abstract class EnrichedEvent implements Event {
+ referenceCount.get()
+ ", pipeName='"
+ pipeName
- + '\''
- + ", pipeTaskMeta="
+ + "', pipeTaskMeta="
+ pipeTaskMeta
+ ", committerKey='"
+ committerKey
- + '\''
- + ", commitId="
+ + "', commitId="
+ commitId
+ ", pattern='"
+ pattern
- + '\''
- + ", startTime="
+ + "', startTime="
+ + startTime
+ + ", endTime="
+ + endTime
+ + ", isPatternParsed="
+ + isPatternParsed
+ + ", isTimeParsed="
+ + isTimeParsed
+ + ", shouldReportOnCommit="
+ + shouldReportOnCommit
+ + '}';
+ }
+
+ public String coreReportMessage() {
+ return "EnrichedEvent{"
+ + "referenceCount="
+ + referenceCount.get()
+ + ", pipeName='"
+ + pipeName
+ + "', committerKey='"
+ + committerKey
+ + "', commitId="
+ + commitId
+ + ", pattern='"
+ + pattern
+ + "', startTime="
+ startTime
+ ", endTime="
+ endTime
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 4ae1b9ee64d..652cda59593 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -255,4 +255,13 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
+ " - "
+ super.toString();
}
+
+ @Override
+ public String coreReportMessage() {
+ return String.format(
+ "PipeInsertNodeTabletInsertionEvent{walEntryHandler=%s,
progressIndex=%s, isAligned=%s, isGeneratedByPipe=%s}",
+ walEntryHandler, progressIndex, isAligned, isGeneratedByPipe)
+ + " - "
+ + super.coreReportMessage();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index eca75a95cec..1b843265515 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -219,4 +219,17 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
+ " - "
+ super.toString();
}
+
+ @Override
+ public String coreReportMessage() {
+ return String.format(
+ "PipeRawTabletInsertionEvent{tablet=%s, isAligned=%s,
sourceEvent=%s, needToReport=%s, allocatedMemoryBlock=%s}",
+ tablet,
+ isAligned,
+ sourceEvent == null ? "null" : sourceEvent.coreReportMessage(),
+ needToReport,
+ allocatedMemoryBlock)
+ + " - "
+ + super.coreReportMessage();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
index 56fe8cd6bd0..35b872e40b3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.resource;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.resource.log.PipeLogManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.db.pipe.resource.wal.PipeWALResourceManager;
@@ -33,6 +34,7 @@ public class PipeResourceManager {
private final PipeTsFileResourceManager pipeTsFileResourceManager;
private final AtomicReference<PipeWALResourceManager> pipeWALResourceManager;
private final PipeMemoryManager pipeMemoryManager;
+ private final PipeLogManager pipeLogManager;
public static PipeTsFileResourceManager tsfile() {
return PipeResourceManagerHolder.INSTANCE.pipeTsFileResourceManager;
@@ -56,12 +58,17 @@ public class PipeResourceManager {
return PipeResourceManagerHolder.INSTANCE.pipeMemoryManager;
}
+ public static PipeLogManager log() {
+ return PipeResourceManagerHolder.INSTANCE.pipeLogManager;
+ }
+
///////////////////////////// SINGLETON /////////////////////////////
private PipeResourceManager() {
pipeTsFileResourceManager = new PipeTsFileResourceManager();
pipeWALResourceManager = new AtomicReference<>();
pipeMemoryManager = new PipeMemoryManager();
+ pipeLogManager = new PipeLogManager();
}
private static class PipeResourceManagerHolder {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogManager.java
new file mode 100644
index 00000000000..c7f6e445b6a
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogManager.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.log;
+
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class PipeLogManager {
+
+ private final ConcurrentMap<Class<?>, PipeLogStatus> logClass2LogStatusMap =
+ new ConcurrentHashMap<>();
+
+ public Optional<Logger> schedule(
+ Class<?> logClass, int maxAverageScale, int maxLogInterval, int scale) {
+ return logClass2LogStatusMap
+ .computeIfAbsent(
+ logClass, k -> new PipeLogStatus(logClass, maxAverageScale,
maxLogInterval))
+ .schedule(scale);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogStatus.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogStatus.java
new file mode 100644
index 00000000000..67355dcc01a
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogStatus.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.log;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+class PipeLogStatus {
+
+ private final Logger logger;
+
+ private final int maxAverageScale;
+ private final int maxLogInterval;
+ private final AtomicLong currentRounds = new AtomicLong(0);
+
+ PipeLogStatus(Class<?> logClass, int maxAverageScale, int maxLogInterval) {
+ logger = LoggerFactory.getLogger(logClass);
+
+ this.maxAverageScale = maxAverageScale;
+ this.maxLogInterval = maxLogInterval;
+ }
+
+ synchronized Optional<Logger> schedule(int scale) {
+ if (currentRounds.incrementAndGet()
+ >= Math.min((int) Math.ceil((double) scale / maxAverageScale),
maxLogInterval)) {
+ currentRounds.set(0);
+ return Optional.of(logger);
+ }
+
+ return Optional.empty();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
index e0306eea7da..98947caab56 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
@@ -158,6 +158,11 @@ public class PipeMemoryBlock implements AutoCloseable {
isReleased = true;
}
+ @Override
+ public String toString() {
+ return "PipeMemoryBlock{" + "memoryUsageInBytes=" +
memoryUsageInBytes.get() + '}';
+ }
+
@Override
public void close() {
while (true) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 9aadda86ae3..00f5d1c964a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.agent.runtime.PipePeriodicalJobExecutor;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -37,6 +38,7 @@ import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@@ -78,6 +80,14 @@ public class PipeTsFileResourceManager {
private void ttlCheck() {
final Iterator<Map.Entry<String, PipeTsFileResource>> iterator =
hardlinkOrCopiedFileToPipeTsFileResourceMap.entrySet().iterator();
+ final Optional<Logger> logger =
+ PipeResourceManager.log()
+ .schedule(
+ PipeTsFileResourceManager.class,
+ PipeConfig.getInstance().getPipeTsFilePinMaxLogNumPerRound(),
+
PipeConfig.getInstance().getPipeTsFilePinMaxLogIntervalRounds(),
+ hardlinkOrCopiedFileToPipeTsFileResourceMap.size());
+
while (iterator.hasNext()) {
final Map.Entry<String, PipeTsFileResource> entry = iterator.next();
@@ -85,10 +95,12 @@ public class PipeTsFileResourceManager {
if (entry.getValue().closeIfOutOfTimeToLive()) {
iterator.remove();
} else {
- LOGGER.info(
- "Pipe file (file name: {}) is still referenced {} times",
- entry.getKey(),
- entry.getValue().getReferenceCount());
+ logger.ifPresent(
+ l ->
+ l.info(
+ "Pipe file (file name: {}) is still referenced {} times",
+ entry.getKey(),
+ entry.getValue().getReferenceCount()));
}
} catch (IOException e) {
LOGGER.warn("failed to close PipeTsFileResource when checking TTL: ",
e);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index aa8f4e71623..7e0adbacd32 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.db.pipe.resource.wal;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
import org.slf4j.Logger;
@@ -30,6 +32,7 @@ import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
@@ -61,6 +64,14 @@ public abstract class PipeWALResourceManager {
private void ttlCheck() {
final Iterator<Map.Entry<Long, PipeWALResource>> iterator =
memtableIdToPipeWALResourceMap.entrySet().iterator();
+ final Optional<Logger> logger =
+ PipeResourceManager.log()
+ .schedule(
+ PipeWALResourceManager.class,
+ PipeConfig.getInstance().getPipeWalPinMaxLogNumPerRound(),
+ PipeConfig.getInstance().getPipeWalPinMaxLogIntervalRounds(),
+ memtableIdToPipeWALResourceMap.size());
+
try {
while (iterator.hasNext()) {
final Map.Entry<Long, PipeWALResource> entry = iterator.next();
@@ -71,11 +82,13 @@ public abstract class PipeWALResourceManager {
try {
if (entry.getValue().invalidateIfPossible()) {
iterator.remove();
- } else if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "WAL (memtableId {}) is still referenced {} times",
- entry.getKey(),
- entry.getValue().getReferenceCount());
+ } else {
+ logger.ifPresent(
+ l ->
+ l.info(
+ "WAL (memtableId {}) is still referenced {} times",
+ entry.getKey(),
+ entry.getValue().getReferenceCount()));
}
} finally {
lock.unlock();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 167ef3e538f..bc30ee2b0d7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -75,8 +75,8 @@ public class PipeConnectorSubtask extends PipeDataNodeSubtask
{
// when no event can be pulled.
private static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT =
new PipeHeartbeatEvent("cron", false);
- private static final long CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_SECONDS =
-
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds();
+ private static final long CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_MILLISECONDS =
+
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds()
* 1000;
private long lastHeartbeatEventInjectTime = System.currentTimeMillis();
public PipeConnectorSubtask(
@@ -131,7 +131,7 @@ public class PipeConnectorSubtask extends
PipeDataNodeSubtask {
try {
if (event == null) {
if (System.currentTimeMillis() - lastHeartbeatEventInjectTime
- > CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_SECONDS) {
+ > CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_MILLISECONDS) {
transferHeartbeatEvent(CRON_HEARTBEAT_EVENT);
}
return false;
@@ -166,7 +166,11 @@ public class PipeConnectorSubtask extends
PipeDataNodeSubtask {
throw new PipeException(
String.format(
"Exception in pipe transfer, subtask: %s, last event: %s, root
cause: %s",
- taskID, lastEvent,
ErrorHandlingUtils.getRootCause(e).getMessage()),
+ taskID,
+ lastEvent instanceof EnrichedEvent
+ ? ((EnrichedEvent) lastEvent).coreReportMessage()
+ : lastEvent.toString(),
+ ErrorHandlingUtils.getRootCause(e).getMessage()),
e);
} else {
LOGGER.info("Exception in pipe transfer, ignored because pipe is
dropped.", e);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
index 821263c6858..ea9570033ae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.storageengine.dataregion.wal.utils;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.MemTablePinException;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
@@ -33,8 +34,8 @@ import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;
/**
- * This handler is used by the Pipe to find the corresponding insert node.
Besides, it can try to
- * pin/unpin the wal entries by the memTable id.
+ * This handler is used by the Pipe to find the corresponding {@link
InsertNode}. Besides, it can
+ * try to pin/unpin the {@link WALEntry}s by the memTable id.
*/
public class WALEntryHandler {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 114f759d759..0a3d1381663 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -189,6 +189,13 @@ public class CommonConfig {
private long pipeMaxAllowedLinkedTsFileCount = 100;
private long pipeStuckRestartIntervalSeconds = 120;
+ private int pipeMetaReportMaxLogNumPerRound = 10;
+ private int pipeMetaReportMaxLogIntervalRounds = 36;
+ private int pipeTsFilePinMaxLogNumPerRound = 10;
+ private int pipeTsFilePinMaxLogIntervalRounds = 90;
+ private int pipeWalPinMaxLogNumPerRound = 10;
+ private int pipeWalPinMaxLogIntervalRounds = 90;
+
private boolean pipeMemoryManagementEnabled = true;
private long pipeMemoryAllocateRetryIntervalMs = 1000;
private int pipeMemoryAllocateMaxRetries = 10;
@@ -787,6 +794,54 @@ public class CommonConfig {
this.pipeStuckRestartIntervalSeconds = pipeStuckRestartIntervalSeconds;
}
+ public int getPipeMetaReportMaxLogNumPerRound() {
+ return pipeMetaReportMaxLogNumPerRound;
+ }
+
+ public void setPipeMetaReportMaxLogNumPerRound(int
pipeMetaReportMaxLogNumPerRound) {
+ this.pipeMetaReportMaxLogNumPerRound = pipeMetaReportMaxLogNumPerRound;
+ }
+
+ public int getPipeMetaReportMaxLogIntervalRounds() {
+ return pipeMetaReportMaxLogIntervalRounds;
+ }
+
+ public void setPipeMetaReportMaxLogIntervalRounds(int
pipeMetaReportMaxLogIntervalRounds) {
+ this.pipeMetaReportMaxLogIntervalRounds =
pipeMetaReportMaxLogIntervalRounds;
+ }
+
+ public int getPipeTsFilePinMaxLogNumPerRound() {
+ return pipeTsFilePinMaxLogNumPerRound;
+ }
+
+ public void setPipeTsFilePinMaxLogNumPerRound(int
pipeTsFilePinMaxLogNumPerRound) {
+ this.pipeTsFilePinMaxLogNumPerRound = pipeTsFilePinMaxLogNumPerRound;
+ }
+
+ public int getPipeTsFilePinMaxLogIntervalRounds() {
+ return pipeTsFilePinMaxLogIntervalRounds;
+ }
+
+ public void setPipeTsFilePinMaxLogIntervalRounds(int
pipeTsFilePinMaxLogIntervalRounds) {
+ this.pipeTsFilePinMaxLogIntervalRounds = pipeTsFilePinMaxLogIntervalRounds;
+ }
+
+ public int getPipeWalPinMaxLogNumPerRound() {
+ return pipeWalPinMaxLogNumPerRound;
+ }
+
+ public void setPipeWalPinMaxLogNumPerRound(int pipeWalPinMaxLogNumPerRound) {
+ this.pipeWalPinMaxLogNumPerRound = pipeWalPinMaxLogNumPerRound;
+ }
+
+ public int getPipeWalPinMaxLogIntervalRounds() {
+ return pipeWalPinMaxLogIntervalRounds;
+ }
+
+ public void setPipeWalPinMaxLogIntervalRounds(int
pipeWalPinMaxLogIntervalRounds) {
+ this.pipeWalPinMaxLogIntervalRounds = pipeWalPinMaxLogIntervalRounds;
+ }
+
public boolean getPipeMemoryManagementEnabled() {
return pipeMemoryManagementEnabled;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index f6dfe120db0..5faf8723817 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -444,6 +444,37 @@ public class CommonDescriptor {
"pipe_stuck_restart_interval_seconds",
String.valueOf(config.getPipeStuckRestartIntervalSeconds()))));
+ config.setPipeMetaReportMaxLogNumPerRound(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_meta_report_max_log_num_per_round",
+ String.valueOf(config.getPipeMetaReportMaxLogNumPerRound()))));
+ config.setPipeMetaReportMaxLogIntervalRounds(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_meta_report_max_log_interval_rounds",
+
String.valueOf(config.getPipeMetaReportMaxLogIntervalRounds()))));
+ config.setPipeTsFilePinMaxLogNumPerRound(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_tsfile_pin_max_log_num_per_round",
+ String.valueOf(config.getPipeTsFilePinMaxLogNumPerRound()))));
+ config.setPipeTsFilePinMaxLogIntervalRounds(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_tsfile_pin_max_log_interval_rounds",
+
String.valueOf(config.getPipeTsFilePinMaxLogIntervalRounds()))));
+ config.setPipeWalPinMaxLogNumPerRound(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_wal_pin_max_log_num_per_round",
+ String.valueOf(config.getPipeWalPinMaxLogNumPerRound()))));
+ config.setPipeWalPinMaxLogIntervalRounds(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_wal_pin_max_log_interval_rounds",
+ String.valueOf(config.getPipeWalPinMaxLogIntervalRounds()))));
+
config.setPipeMemoryManagementEnabled(
Boolean.parseBoolean(
properties.getProperty(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 219701f74ee..80223a4439c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -185,6 +185,32 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeStuckRestartIntervalSeconds();
}
+ /////////////////////////////// Logger ///////////////////////////////
+
+ public int getPipeMetaReportMaxLogNumPerRound() {
+ return COMMON_CONFIG.getPipeMetaReportMaxLogNumPerRound();
+ }
+
+ public int getPipeMetaReportMaxLogIntervalRounds() {
+ return COMMON_CONFIG.getPipeMetaReportMaxLogIntervalRounds();
+ }
+
+ public int getPipeTsFilePinMaxLogNumPerRound() {
+ return COMMON_CONFIG.getPipeTsFilePinMaxLogNumPerRound();
+ }
+
+ public int getPipeTsFilePinMaxLogIntervalRounds() {
+ return COMMON_CONFIG.getPipeTsFilePinMaxLogIntervalRounds();
+ }
+
+ public int getPipeWalPinMaxLogNumPerRound() {
+ return COMMON_CONFIG.getPipeWalPinMaxLogNumPerRound();
+ }
+
+ public int getPipeWalPinMaxLogIntervalRounds() {
+ return COMMON_CONFIG.getPipeWalPinMaxLogIntervalRounds();
+ }
+
/////////////////////////////// Memory ///////////////////////////////
public boolean getPipeMemoryManagementEnabled() {
@@ -284,6 +310,13 @@ public class PipeConfig {
LOGGER.info("PipeMaxAllowedLinkedTsFileCount: {}",
getPipeMaxAllowedLinkedTsFileCount());
LOGGER.info("PipeStuckRestartIntervalSeconds: {}",
getPipeStuckRestartIntervalSeconds());
+ LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}",
getPipeMetaReportMaxLogNumPerRound());
+ LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}",
getPipeMetaReportMaxLogIntervalRounds());
+ LOGGER.info("PipeTsFilePinMaxLogNumPerRound: {}",
getPipeTsFilePinMaxLogNumPerRound());
+ LOGGER.info("PipeTsFilePinMaxLogIntervalRounds: {}",
getPipeTsFilePinMaxLogIntervalRounds());
+ LOGGER.info("PipeWalPinMaxLogNumPerRound: {}",
getPipeWalPinMaxLogNumPerRound());
+ LOGGER.info("PipeWalPinMaxLogIntervalRounds: {}",
getPipeWalPinMaxLogIntervalRounds());
+
LOGGER.info("PipeMemoryManagementEnabled: {}",
getPipeMemoryManagementEnabled());
LOGGER.info("PipeMemoryAllocateMaxRetries: {}",
getPipeMemoryAllocateMaxRetries());
LOGGER.info(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
index 003eec9bdd8..742d2db7135 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
@@ -89,6 +89,10 @@ public class PipeMetaKeeper {
return pipeNameToPipeMetaMap.values();
}
+ public int getPipeMetaCount() {
+ return pipeNameToPipeMetaMap.size();
+ }
+
public PipeMeta getPipeMetaByPipeName(String pipeName) {
return pipeNameToPipeMetaMap.get(pipeName);
}