This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 18ea0e9aadc add (#17717)
18ea0e9aadc is described below
commit 18ea0e9aadc2c39a1fcbf8e34c559ae142b71221
Author: Caideyipi <[email protected]>
AuthorDate: Mon May 25 15:30:16 2026 +0800
add (#17717)
---
.../runtime/heartbeat/PipeHeartbeatParser.java | 13 ++
.../agent/task/connection/PipeEventCollector.java | 9 ++
.../subtask/processor/PipeProcessorSubtask.java | 7 +
.../event/common/terminate/PipeTerminateEvent.java | 179 +++++++++++++++++++++
...istoricalDataRegionTsFileAndDeletionSource.java | 26 +++
.../pipe/receiver/PipeReceiverStatusHandler.java | 29 ++++
6 files changed, 263 insertions(+)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index 423353dd1fb..597120ffe1c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -157,11 +157,24 @@ public class PipeHeartbeatParser {
if (Boolean.TRUE.equals(isPipeCompletedFromAgent)) {
temporaryMeta.markDataNodeCompleted(nodeId);
+ LOGGER.info(
+ "Detected historical pipe completion report from DataNode {} for
pipe {}. remainingEventCount: {}, remainingTime: {}, completedDataNodes: {}",
+ nodeId,
+ staticMeta.getPipeName(),
+ pipeHeartbeat.getRemainingEventCount(staticMeta),
+ pipeHeartbeat.getRemainingTime(staticMeta),
+ temporaryMeta.getCompletedDataNodeIds());
final Set<Integer> uncompletedDataNodeIds =
configManager.getNodeManager().getRegisteredDataNodeLocations().keySet();
uncompletedDataNodeIds.removeAll(temporaryMeta.getCompletedDataNodeIds());
if (uncompletedDataNodeIds.isEmpty()) {
+ LOGGER.info(
+ "All DataNodes reported historical pipe {} completed.
globalRemainingEventCount: {}, globalRemainingTime: {}, staticMeta: {}",
+ staticMeta.getPipeName(),
+ temporaryMeta.getGlobalRemainingEvents(),
+ temporaryMeta.getGlobalRemainingTime(),
+ staticMeta);
pipeTaskInfo.get().removePipeMeta(staticMeta.getPipeName());
LOGGER.info(
ManagerMessages.DETECTED_COMPLETION_OF_PIPE_STATIC_META_REMOVE_IT,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index ad44b78042a..7e6b0aa7781 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -32,6 +32,7 @@ import
org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.source.schemaregion.IoTDBSchemaRegionSource;
import
org.apache.iotdb.db.pipe.source.schemaregion.PipePlanTreePrivilegeParseVisitor;
@@ -138,12 +139,20 @@ public class PipeEventCollector implements EventCollector
{
if (skipParsing || !forceTabletFormat &&
canSkipParsing4TsFileEvent(sourceEvent)) {
collectEvent(sourceEvent);
+ if (sourceEvent.isGeneratedByHistoricalExtractor()) {
+ PipeTerminateEvent.markHistoricalTsFileUnsplit(
+ sourceEvent.getPipeName(), sourceEvent.getCreationTime(),
regionId);
+ }
return;
}
try {
sourceEvent.consumeTabletInsertionEventsWithRetry(
this::collectParsedRawTableEvent,
"PipeEventCollector::parseAndCollectEvent");
+ if (sourceEvent.isGeneratedByHistoricalExtractor()) {
+ PipeTerminateEvent.markHistoricalTsFileSplit(
+ sourceEvent.getPipeName(), sourceEvent.getCreationTime(),
regionId);
+ }
} finally {
sourceEvent.close();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index c40b37b31e5..fe9737b0d7d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -36,6 +36,7 @@ import
org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.metric.processor.PipeProcessorMetrics;
@@ -181,6 +182,12 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
}
},
"PipeProcessorSubtask::executeOnce");
+ if (tsFileInsertionEvent.isGeneratedByHistoricalExtractor()) {
+ PipeTerminateEvent.markHistoricalTsFileSplit(
+ tsFileInsertionEvent.getPipeName(),
+ tsFileInsertionEvent.getCreationTime(),
+ regionId);
+ }
if (ex.get() != null) {
throw ex.get();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
index 64209242cc4..12385c52b06 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
@@ -33,9 +33,16 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.agent.task.PipeDataNodeTask;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
/**
* The {@link PipeTerminateEvent} is an {@link EnrichedEvent} that controls
the termination of pipe,
@@ -45,6 +52,8 @@ import java.util.concurrent.TimeUnit;
*/
public class PipeTerminateEvent extends EnrichedEvent {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTerminateEvent.class);
+
private final int dataRegionId;
private final boolean shouldMark;
@@ -58,6 +67,9 @@ public class PipeTerminateEvent extends EnrichedEvent {
// Do not use call run policy to avoid deadlock
private static final ExecutorService terminateExecutor =
createTerminateExecutor();
+ private static final ConcurrentMap<HistoricalTransferKey,
HistoricalTransferSummaryCounter>
+ HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP = new ConcurrentHashMap<>();
+
private static ExecutorService createTerminateExecutor() {
final WrappedThreadPoolExecutor executor =
new WrappedThreadPoolExecutor(
@@ -145,6 +157,18 @@ public class PipeTerminateEvent extends EnrichedEvent {
}
public void markCompleted() {
+ final HistoricalTransferSummary summary =
+ snapshotAndClearHistoricalTransferSummary(pipeName, creationTime,
dataRegionId);
+ if (Objects.nonNull(summary)) {
+ LOGGER.info(
+ "Pipe {}@{}: terminate event committed for historical transfer.
creationTime: {}, shouldMark: {}. {}",
+ pipeName,
+ dataRegionId,
+ creationTime,
+ shouldMark,
+ summary.toReportMessage());
+ }
+
// To avoid deadlock
if (shouldMark) {
terminateExecutor.submit(
@@ -159,4 +183,159 @@ public class PipeTerminateEvent extends EnrichedEvent {
+ " - "
+ super.toString();
}
+
+ public static void initializeHistoricalTransferSummary(
+ final String pipeName,
+ final long creationTime,
+ final int dataRegionId,
+ final long extractedHistoricalTsFileCount,
+ final long extractedHistoricalDeletionCount) {
+ HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP
+ .computeIfAbsent(
+ new HistoricalTransferKey(pipeName, creationTime, dataRegionId),
+ ignored -> new HistoricalTransferSummaryCounter())
+ .initialize(extractedHistoricalTsFileCount,
extractedHistoricalDeletionCount);
+ }
+
+ public static void markHistoricalTsFileSkipped(
+ final String pipeName, final long creationTime, final int dataRegionId) {
+ getOrCreateHistoricalTransferSummaryCounter(pipeName, creationTime,
dataRegionId)
+ .skippedHistoricalTsFileCount
+ .incrementAndGet();
+ }
+
+ public static void markHistoricalTsFileSplit(
+ final String pipeName, final long creationTime, final int dataRegionId) {
+ getOrCreateHistoricalTransferSummaryCounter(pipeName, creationTime,
dataRegionId)
+ .splitHistoricalTsFileCount
+ .incrementAndGet();
+ }
+
+ public static void markHistoricalTsFileUnsplit(
+ final String pipeName, final long creationTime, final int dataRegionId) {
+ getOrCreateHistoricalTransferSummaryCounter(pipeName, creationTime,
dataRegionId)
+ .unsplitHistoricalTsFileCount
+ .incrementAndGet();
+ }
+
+ public static HistoricalTransferSummary snapshotHistoricalTransferSummary(
+ final String pipeName, final long creationTime, final int dataRegionId) {
+ final HistoricalTransferSummaryCounter counter =
+ HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.get(
+ new HistoricalTransferKey(pipeName, creationTime, dataRegionId));
+ return Objects.nonNull(counter) ? counter.snapshot() : null;
+ }
+
+ public static void clearHistoricalTransferSummary(
+ final String pipeName, final long creationTime, final int dataRegionId) {
+ HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.remove(
+ new HistoricalTransferKey(pipeName, creationTime, dataRegionId));
+ }
+
+ private static HistoricalTransferSummary
snapshotAndClearHistoricalTransferSummary(
+ final String pipeName, final long creationTime, final int dataRegionId) {
+ final HistoricalTransferSummaryCounter counter =
+ HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.remove(
+ new HistoricalTransferKey(pipeName, creationTime, dataRegionId));
+ return Objects.nonNull(counter) ? counter.snapshot() : null;
+ }
+
+ private static HistoricalTransferSummaryCounter
getOrCreateHistoricalTransferSummaryCounter(
+ final String pipeName, final long creationTime, final int dataRegionId) {
+ return HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.computeIfAbsent(
+ new HistoricalTransferKey(pipeName, creationTime, dataRegionId),
+ ignored -> new HistoricalTransferSummaryCounter());
+ }
+
+ public static final class HistoricalTransferSummary {
+
+ private final long extractedHistoricalTsFileCount;
+ private final long skippedHistoricalTsFileCount;
+ private final long splitHistoricalTsFileCount;
+ private final long unsplitHistoricalTsFileCount;
+ private final long extractedHistoricalDeletionCount;
+
+ private HistoricalTransferSummary(
+ final long extractedHistoricalTsFileCount,
+ final long skippedHistoricalTsFileCount,
+ final long splitHistoricalTsFileCount,
+ final long unsplitHistoricalTsFileCount,
+ final long extractedHistoricalDeletionCount) {
+ this.extractedHistoricalTsFileCount = extractedHistoricalTsFileCount;
+ this.skippedHistoricalTsFileCount = skippedHistoricalTsFileCount;
+ this.splitHistoricalTsFileCount = splitHistoricalTsFileCount;
+ this.unsplitHistoricalTsFileCount = unsplitHistoricalTsFileCount;
+ this.extractedHistoricalDeletionCount = extractedHistoricalDeletionCount;
+ }
+
+ public String toReportMessage() {
+ return String.format(
+ "historical summary: extractedTsFileCount=%s, skippedTsFileCount=%s,
splitTsFileCount=%s, unsplitTsFileCount=%s, deletionCount=%s",
+ extractedHistoricalTsFileCount,
+ skippedHistoricalTsFileCount,
+ splitHistoricalTsFileCount,
+ unsplitHistoricalTsFileCount,
+ extractedHistoricalDeletionCount);
+ }
+ }
+
+ private static final class HistoricalTransferSummaryCounter {
+
+ private final AtomicLong extractedHistoricalTsFileCount = new
AtomicLong(0);
+ private final AtomicLong skippedHistoricalTsFileCount = new AtomicLong(0);
+ private final AtomicLong splitHistoricalTsFileCount = new AtomicLong(0);
+ private final AtomicLong unsplitHistoricalTsFileCount = new AtomicLong(0);
+ private final AtomicLong extractedHistoricalDeletionCount = new
AtomicLong(0);
+
+ private void initialize(
+ final long extractedHistoricalTsFileCount, final long
extractedHistoricalDeletionCount) {
+ this.extractedHistoricalTsFileCount.set(extractedHistoricalTsFileCount);
+ this.skippedHistoricalTsFileCount.set(0);
+ this.splitHistoricalTsFileCount.set(0);
+ this.unsplitHistoricalTsFileCount.set(0);
+
this.extractedHistoricalDeletionCount.set(extractedHistoricalDeletionCount);
+ }
+
+ private HistoricalTransferSummary snapshot() {
+ return new HistoricalTransferSummary(
+ extractedHistoricalTsFileCount.get(),
+ skippedHistoricalTsFileCount.get(),
+ splitHistoricalTsFileCount.get(),
+ unsplitHistoricalTsFileCount.get(),
+ extractedHistoricalDeletionCount.get());
+ }
+ }
+
+ private static final class HistoricalTransferKey {
+
+ private final String pipeName;
+ private final long creationTime;
+ private final int dataRegionId;
+
+ private HistoricalTransferKey(
+ final String pipeName, final long creationTime, final int
dataRegionId) {
+ this.pipeName = pipeName;
+ this.creationTime = creationTime;
+ this.dataRegionId = dataRegionId;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof HistoricalTransferKey)) {
+ return false;
+ }
+ final HistoricalTransferKey that = (HistoricalTransferKey) obj;
+ return creationTime == that.creationTime
+ && dataRegionId == that.dataRegionId
+ && Objects.equals(pipeName, that.pipeName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pipeName, creationTime, dataRegionId);
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index 053aac13887..e71d80a61a6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -168,6 +168,8 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
new HashMap<>();
private final Map<PersistentResource, Long>
pendingResource2ReplicateIndexForIoTV2 =
new HashMap<>();
+ private int extractedHistoricalTsFileCount = 0;
+ private int extractedHistoricalDeletionCount = 0;
@Override
public void validate(final PipeParameterValidator validator) {
@@ -488,6 +490,8 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
return;
}
hasBeenStarted = true;
+ extractedHistoricalTsFileCount = 0;
+ extractedHistoricalDeletionCount = 0;
final DataRegion dataRegion =
StorageEngine.getInstance().getDataRegion(new
DataRegionId(dataRegionId));
@@ -521,6 +525,12 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSource
? Long.compare(o1.getFileStartTime(), o2.getFileStartTime())
:
o1.getProgressIndex().topologicalCompareTo(o2.getProgressIndex()));
pendingQueue = new ArrayDeque<>(originalResourceList);
+ PipeTerminateEvent.initializeHistoricalTransferSummary(
+ pipeName,
+ creationTime,
+ dataRegionId,
+ extractedHistoricalTsFileCount,
+ extractedHistoricalDeletionCount);
LOGGER.info(
DataNodePipeMessages.PIPE_FINISH_TO_SORT_ALL_EXTRACTED_RESOURCES,
@@ -649,6 +659,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
return true;
}
});
+ extractedHistoricalTsFileCount =
filteredTsFileResources2TableNames.size();
LOGGER.info(
"Pipe {}@{}: finish to extract historical TsFile, extracted sequence
file count {}/{}, "
@@ -798,6 +809,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
})
.collect(Collectors.toList());
resourceList.addAll(allDeletionResources);
+ extractedHistoricalDeletionCount = allDeletionResources.size();
LOGGER.info(
DataNodePipeMessages.PIPE_FINISH_TO_EXTRACT_DELETIONS_EXTRACT_DELETIONS,
pipeName,
@@ -841,6 +853,16 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSource
}
private Event supplyTerminateEvent() {
+ final PipeTerminateEvent.HistoricalTransferSummary
historicalTransferSummary =
+ PipeTerminateEvent.snapshotHistoricalTransferSummary(pipeName,
creationTime, dataRegionId);
+ if (Objects.nonNull(historicalTransferSummary)) {
+ LOGGER.info(
+ "Pipe {}@{}: historical source has supplied all events, emitting
terminate event. {}",
+ pipeName,
+ dataRegionId,
+ historicalTransferSummary.toReportMessage());
+ }
+
final PipeTerminateEvent terminateEvent =
new PipeTerminateEvent(
pipeName,
@@ -867,6 +889,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
}
filteredTsFileResources2TableNames.remove(resource);
+ PipeTerminateEvent.markHistoricalTsFileSkipped(pipeName, creationTime,
dataRegionId);
LOGGER.info(
DataNodePipeMessages.PIPE_SKIP_HISTORICAL_TSFILE_BECAUSE_REALTIME_SOURCE,
pipeName,
@@ -1074,6 +1097,9 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSource
@Override
public synchronized void close() {
+ if (!isTerminateSignalSent) {
+ PipeTerminateEvent.clearHistoricalTransferSummary(pipeName,
creationTime, dataRegionId);
+ }
if (Objects.nonNull(pendingQueue)) {
pendingQueue.forEach(
resource -> {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index 5cc9017df73..1a922448fc0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -49,6 +49,7 @@ public class PipeReceiverStatusHandler {
private static final String NO_PERMISSION = "No permission";
private static final String UNCLASSIFIED_EXCEPTION = "Unclassified
exception";
private static final String NO_PERMISSION_STR = "No permissions for this
operation";
+ private static final int MAX_RECORD_MESSAGE_LENGTH_IN_LOG = 2048;
private final boolean isRetryAllowedWhenConflictOccurs;
private final long retryMaxMillisWhenConflictOccurs;
@@ -147,6 +148,7 @@ public class PipeReceiverStatusHandler {
PipeMessages.USER_CONFLICT_NOT_ALLOWED,
shouldRecordIgnoredDataWhenConflictOccurs ? recordMessage : "not
recorded",
status);
+ logDiscardedUserConflictData("retry is not allowed", recordMessage,
status);
return;
}
@@ -160,6 +162,7 @@ public class PipeReceiverStatusHandler {
PipeMessages.USER_CONFLICT_RETRY_TIMEOUT,
shouldRecordIgnoredDataWhenConflictOccurs ? recordMessage :
"not recorded",
status);
+ logDiscardedUserConflictData("retry timeout", recordMessage,
status);
resetExceptionStatus();
return;
}
@@ -266,6 +269,32 @@ public class PipeReceiverStatusHandler {
return noPermission ? NO_PERMISSION : UNCLASSIFIED_EXCEPTION;
}
+ private void logDiscardedUserConflictData(
+ final String reason, final String recordMessage, final TSStatus status) {
+ if (!LOGGER.isWarnEnabled()) {
+ return;
+ }
+
+ LOGGER.warn(
+ "User conflict exception: discarded data info because {}. data: {}.
receiver message: {}. status: {}",
+ reason,
+ summarizeRecordMessage(recordMessage),
+ status.getMessage(),
+ status);
+ }
+
+ private String summarizeRecordMessage(final String recordMessage) {
+ if (Objects.isNull(recordMessage) || recordMessage.isEmpty()) {
+ return "<empty>";
+ }
+
+ final String normalizedRecordMessage =
+ recordMessage.replace('\r', ' ').replace('\n', ' ').trim();
+ return normalizedRecordMessage.length() <= MAX_RECORD_MESSAGE_LENGTH_IN_LOG
+ ? normalizedRecordMessage
+ : normalizedRecordMessage.substring(0,
MAX_RECORD_MESSAGE_LENGTH_IN_LOG) + "...(truncated)";
+ }
+
private void recordExceptionStatusIfNecessary(final String message) {
if (!Objects.equals(exceptionRecordedMessage.get(), message)) {
exceptionFirstEncounteredTime.set(System.currentTimeMillis());