This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b616502aec0 Pipe Log: Added the remaining chinese logs & Further
reduced the repeatable logs (#17700)
b616502aec0 is described below
commit b616502aec0b700d0b7f3a1577e9ecee1edc365b
Author: Caideyipi <[email protected]>
AuthorDate: Fri May 29 14:12:58 2026 +0800
Pipe Log: Added the remaining chinese logs & Further reduced the repeatable
logs (#17700)
* zh
* h
---
.../iotdb/confignode/i18n/ManagerMessages.java | 14 +--
.../apache/iotdb/db/i18n/DataNodePipeMessages.java | 19 ++++
.../apache/iotdb/db/i18n/DataNodePipeMessages.java | 73 +++++++++------
.../pipe/agent/plugin/PipeDataNodePluginAgent.java | 22 +++--
.../common/tsfile/PipeTsFileInsertionEvent.java | 10 +-
.../client/IoTDBDataNodeAsyncClientManager.java | 28 ++++--
.../iotconsensusv2/IoTConsensusV2AsyncSink.java | 103 +++++++++++++++------
.../handler/IoTConsensusV2DeleteEventHandler.java | 16 +++-
.../IoTConsensusV2TabletBatchEventHandler.java | 20 +++-
.../IoTConsensusV2TabletInsertionEventHandler.java | 17 +++-
.../IoTConsensusV2TsFileInsertionEventHandler.java | 18 +++-
.../sink/protocol/legacy/IoTDBLegacyPipeSink.java | 3 +-
.../thrift/async/IoTDBDataRegionAsyncSink.java | 33 +++++--
.../handler/PipeTransferTrackableHandler.java | 21 +++--
.../async/handler/PipeTransferTsFileHandler.java | 23 ++++-
.../PipeRealtimeDataRegionHybridSource.java | 4 +-
.../realtime/PipeRealtimeDataRegionLogSource.java | 5 +-
.../realtime/PipeRealtimeDataRegionSource.java | 8 +-
.../PipeRealtimeDataRegionTsFileSource.java | 5 +-
.../apache/iotdb/commons/i18n/PipeMessages.java | 14 +++
.../apache/iotdb/commons/i18n/PipeMessages.java | 14 +++
.../pipe/datastructure/pattern/TreePattern.java | 36 +++----
22 files changed, 351 insertions(+), 155 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java
b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java
index 836fe7dd60b..e7321e5376c 100644
---
a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java
+++
b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java
@@ -28,11 +28,11 @@ public final class ManagerMessages {
public static final String
AFTER_THIS_SUCCESSFUL_SYNC_IF_SUBSCRIPTIONINFO_IS_EMPTY_DURING_THIS =
"After this successful sync, if SubscriptionInfo is empty during this
sync and has not been modified afterwards, all subsequent syncs will be
skipped";
public static final String
ATTEMPT_TO_REPORT_PIPE_EXCEPTION_TO_A_NULL_PIPETASKMETA =
- "Attempt to report pipe exception to a null PipeTaskMeta.";
+ "尝试向空的 PipeTaskMeta 上报 pipe 异常。";
public static final String AUTH_RUN_AUTH_PLAN = "Auth: run auth plan: {}";
public static final String CLUSTERID = "clusterID: {}";
public static final String COLLECTING_PIPE_HEARTBEAT_FROM_DATA_NODES =
- "Collecting pipe heartbeat {} from data nodes";
+ "正在从 data nodes 收集 pipe 心跳 {}";
public static final String CONNECTION_FROM_DATANODE_TO_DATANODE_IS_BROKEN =
"Connection from DataNode {} to DataNode {} is broken";
public static final String CONSENSUSGROUPSTATISTICS =
"[ConsensusGroupStatistics]\t {}: {} -> {}";
@@ -128,7 +128,7 @@ public final class ManagerMessages {
public static final String FAILED_TO_CREATE_PEER_FOR_CONSENSUS_GROUP =
"Failed to create peer for consensus group";
public static final String FAILED_TO_CREATE_PIPE_RESULT_STATUS =
- "Failed to create pipe {}. Result status: {}.";
+ "创建 pipe {} 失败。结果状态:{}。";
public static final String FAILED_TO_CREATE_SUBTASK_FOR_PIPE_CREATION_TIME =
"Failed to create subtask for pipe %s, creation time %d";
public static final String
FAILED_TO_CREATE_TOPIC_WITH_ATTRIBUTES_RESULT_STATUS =
@@ -143,7 +143,7 @@ public final class ManagerMessages {
public static final String
FAILED_TO_DEREGISTER_PIPE_TEMPORARY_META_METRICS_PIPETEMPORARYMETA_DOES_NOT =
"Failed to deregister pipe temporary meta metrics, PipeTemporaryMeta({})
does not exist";
public static final String FAILED_TO_DROP_PIPE_RESULT_STATUS =
- "Failed to drop pipe {}. Result status: {}.";
+ "删除 pipe {} 失败。结果状态:{}。";
public static final String FAILED_TO_GET_ALL_PIPE_INFO = "Failed to get all
pipe info.";
public static final String FAILED_TO_GET_ALL_SUBSCRIPTION_INFO =
"Failed to get all subscription info.";
@@ -162,9 +162,9 @@ public final class ManagerMessages {
public static final String FAILED_TO_SHOW_SUBSCRIPTION_INFO = "Failed to
show subscription info.";
public static final String FAILED_TO_SHOW_TOPIC_INFO = "Failed to show topic
info.";
public static final String FAILED_TO_START_PIPE_RESULT_STATUS =
- "Failed to start pipe {}. Result status: {}.";
+ "启动 pipe {} 失败。结果状态:{}。";
public static final String FAILED_TO_STOP_PIPE_RESULT_STATUS =
- "Failed to stop pipe {}. Result status: {}.";
+ "停止 pipe {} 失败。结果状态:{}。";
public static final String
FAILED_TO_SUBMIT_ASYNC_CONSENSUS_PIPE_CREATION_FOR =
"Failed to submit async consensus pipe creation for {}: {}";
public static final String FAILED_TO_SUBMIT_ASYNC_CONSENSUS_PIPE_DROP_FOR =
@@ -172,7 +172,7 @@ public final class ManagerMessages {
public static final String FAILED_TO_SYNC_CONSUMER_GROUP_META_RESULT_STATUS =
"Failed to sync consumer group meta. Result status: {}.";
public static final String FAILED_TO_SYNC_PIPE_META_RESULT_STATUS =
- "Failed to sync pipe meta. Result status: {}.";
+ "同步 pipe 元数据失败。结果状态:{}。";
public static final String
FAILED_TO_SYNC_TEMPLATE_EXTENSION_INFO_TO_DATANODE =
"Failed to sync template {} extension info to DataNode {}";
public static final String FAILED_TO_SYNC_TOPIC_META_RESULT_STATUS =
diff --git
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
index 43ccba45cae..096aa07914b 100644
---
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
+++
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
@@ -142,6 +142,16 @@ public final class DataNodePipeMessages {
"Failed to persist progress index to configNode, status: {}";
public static final String FAILURE_WHEN_REGISTER_PIPE_PLUGIN_SKIP_THIS =
"Failure when register pipe plugin {}. Skip this plugin and continue
startup.";
+ public static final String
+ FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_NAME_CONFLICTS_WITH_BUILTIN =
+ "Failed to register PipePlugin %s, because the given PipePlugin name
is the same as a built-in PipePlugin name.";
+ public static final String
+ FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_INSTANCE_CONSTRUCTION_FAILED =
+ "Failed to register PipePlugin %s(%s), because its instance can not
be constructed successfully. Exception: %s";
+ public static final String
FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_JAR_MD5_MISMATCH =
+ "Failed to register PipePlugin %s, because existed md5 of jar file for
pipe plugin %s is different from the new jar file.";
+ public static final String FAILED_TO_DEREGISTER_BUILTIN_PIPE_PLUGIN =
+ "Failed to deregister builtin PipePlugin %s.";
public static final String PIPECONNECTOR = "PipeConnector: ";
public static final String
PIPEDATANODETASKBUILDER_FAILED_TO_PARSE_INCLUSION_AND_EXCLUSION =
"PipeDataNodeTaskBuilder failed to parse 'inclusion' and 'exclusion'
parameters: {}";
@@ -439,8 +449,15 @@ public final class DataNodePipeMessages {
public static final String FAILED_TO_START_SOURCES = "failed to start
sources.";
public static final String HEARTBEAT_EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE =
"Heartbeat Event {} can not be supplied because the reference count can
not be increased";
+ public static final String EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE_DATA_IS_LOST =
+ "Event %s can not be supplied because the reference count can not be
increased, the data represented by this event is lost";
public static final String INTERRUPTED_WAITING_FOR_PROCESSOR_TO_STOP =
"Interrupted waiting for processor to stop";
+ public static final String
INTERRUPTED_WHEN_WAITING_FOR_PARSING_PRIVILEGE_FOR_TSFILE =
+ "Interrupted when waiting for parsing privilege for TsFile %s.";
+ public static final String PARSE_TSFILE_WHEN_CHECKING_PRIVILEGE_ERROR =
+ "Parse TsFile %s when checking privilege error. Because: %s";
+ public static final String READ_TSFILE_ERROR = "Read TsFile %s error.";
public static final String
IOTDBSCHEMAREGIONSOURCE_DOES_NOT_SUPPORT_TRANSFERRING_EVENTS_UNDER =
"IoTDBSchemaRegionSource does not support transferring events under
simple consensus";
public static final String NOT_HAS_PRIVILEGE_TO_TRANSFER_EVENT =
@@ -826,6 +843,8 @@ public final class DataNodePipeMessages {
public static final String REDIRECT_FILE_POSITION_TO = "Redirect file
position to {}.";
public static final String REDIRECT_TO_POSITION_IN_TRANSFERRING_TSFILE =
"Redirect to position {} in transferring tsFile {}.";
+ public static final String NETWORK_FAILED_TO_RECEIVE_TSFILE_STATUS =
+ "Network failed to receive tsFile %s, status: %s";
public static final String SECURITY_DIR = "security dir: {}";
public static final String SECURITY_PKI_DIR = "security pki dir: {}";
public static final String SUCCESSFULLY_ADDED_ITEM = "Successfully added
item {}.";
diff --git
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
index adb27e50901..4d514c19ba5 100644
---
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
+++
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
@@ -84,7 +84,7 @@ public final class DataNodePipeMessages {
// ===================== AGENT =====================
public static final String ATTEMPT_TO_REPORT_PIPE_EXCEPTION_TO_A =
- "Attempt to report pipe exception to a null PipeTaskMeta.";
+ "尝试向空的 PipeTaskMeta 上报 pipe 异常。";
public static final String CANNOT_PARSE_REBOOT_TIMES_FROM_FILE_SET =
"无法解析 reboot times from file {}, set the current time in seconds ({}) as
the reboot times";
public static final String CANNOT_RECORD_REBOOT_TIMES_TO_FILE_THE =
@@ -92,11 +92,11 @@ public final class DataNodePipeMessages {
public static final String
CANNOT_START_SIMPLEPROGRESSINDEXASSIGNER_BECAUSE_OF =
"无法启动 SimpleProgressIndexAssigner because of {}";
public static final String CREATE_PIPE_DN_TASK_SUCCESSFULLY_WITHIN_MS =
- "创建 pipe DN task {} successfully within {} ms";
+ "创建 pipe DN task {} 成功,耗时 {} ms";
public static final String
DEREGISTER_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT =
- "Deregister subtask {}. runningTaskCount: {}, registeredTaskCount: {}";
+ "注销子任务 {}。runningTaskCount: {}, registeredTaskCount: {}";
public static final String DROP_PIPE_DN_TASK_SUCCESSFULLY_WITHIN_MS =
- "Drop pipe DN task {} successfully within {} ms";
+ "删除 pipe DN task {} 成功,耗时 {} ms";
public static final String
ERROR_OCCURRED_WHEN_COLLECTING_EVENTS_FROM_PROCESSOR =
"collecting events from processor 时发生错误";
public static final String
EXCEPTION_IN_PIPE_EVENT_PROCESSING_IGNORED_BECAUSE =
@@ -132,9 +132,19 @@ public final class DataNodePipeMessages {
"获取 pipe task meta from config node. Ignore the exception 失败,原因:config
node may not be "
+ "ready yet, and meta will be pushed by config node later.";
public static final String FAILED_TO_PERSIST_PROGRESS_INDEX_TO_CONFIGNODE =
- "persist progress index to configNode, status: {} 失败";
+ "持久化 progress index 到 configNode 失败,状态:{}";
public static final String FAILURE_WHEN_REGISTER_PIPE_PLUGIN_SKIP_THIS =
- "Failure when register pipe plugin {}. Skip this plugin and continue
startup.";
+ "注册 pipe plugin {} 失败。将跳过该插件并继续启动。";
+ public static final String
+ FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_NAME_CONFLICTS_WITH_BUILTIN =
+ "注册 PipePlugin %s 失败,因为给定的 PipePlugin 名称与内置 PipePlugin 名称重复。";
+ public static final String
+ FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_INSTANCE_CONSTRUCTION_FAILED =
+ "注册 PipePlugin %s(%s) 失败,因为其实例无法成功构造。异常:%s";
+ public static final String
FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_JAR_MD5_MISMATCH =
+ "注册 PipePlugin %s 失败,因为 pipe plugin %s 已存在的 jar 文件 MD5 与新的 jar 文件不同。";
+ public static final String FAILED_TO_DEREGISTER_BUILTIN_PIPE_PLUGIN =
+ "注销内置 PipePlugin %s 失败。";
public static final String PIPECONNECTOR = "PipeConnector: ";
public static final String
PIPEDATANODETASKBUILDER_FAILED_TO_PARSE_INCLUSION_AND_EXCLUSION =
"PipeDataNodeTaskBuilder failed to parse 'inclusion' and 'exclusion'
parameters: {}";
@@ -157,51 +167,51 @@ public final class DataNodePipeMessages {
"PipeDataNodeTaskBuilder: When the realtime sync is enabled, we enable
rate limiter in "
+ "sending tsfile by default to reserve disk and network IO for
realtime sending.";
public static final String
PIPEEVENTCOLLECTOR_THE_EVENT_IS_ALREADY_RELEASED_SKIPPING =
- "PipeEventCollector: The event {} is already released, skipping it.";
+ "PipeEventCollector:事件 {} 已被释放,跳过处理。";
public static final String PIPE_CONNECTOR_SUBTASK_WAS_CLOSED_WITHIN_MS =
"Pipe:connector subtask {} ({}) 已关闭 within {} ms";
- public static final String PIPE_META_NOT_FOUND = "Pipe meta not found: ";
+ public static final String PIPE_META_NOT_FOUND = "未找到 pipe 元数据:";
public static final String PIPE_SINK_SUBTASKS_WITH_ATTRIBUTES_IS_BOUNDED =
"Pipe sink subtasks with attributes {} is bounded with sinkExecutor {}
and "
+ "callbackExecutor {}.";
public static final String PIPE_SKIPPING_TEMPORARY_TSFILE_WHICH_SHOULDN_T =
- "Pipe skipping temporary TsFile which shouldn't be transferred: {}";
+ "Pipe 跳过不应传输的临时 TsFile:{}";
public static final String PULLED_PIPE_META_FROM_CONFIG_NODE_RECOVERING =
- "Pulled pipe meta from config node: {}, recovering ...";
+ "已从 config node 拉取 pipe 元数据:{},正在恢复 ...";
public static final String RECEIVED_PIPE_HEARTBEAT_REQUEST_FROM_CONFIG_NODE =
- "Received pipe heartbeat request {} from config node.";
+ "收到来自 config node 的 pipe 心跳请求 {}。";
public static final String
REGION_NO_TSFILEINSERTIONEVENTS_TO_REPLACE_FOR_SOURCE =
"Region {}: No TsFileInsertionEvents to replace for source files {}";
public static final String REGION_REPLACED_TSFILEINSERTIONEVENTS_WITH =
"Region {}: Replaced TsFileInsertionEvents {} with {}";
- public static final String REGISTEREDTASKCOUNT_0 = "registeredTaskCount < 0";
- public static final String REGISTEREDTASKCOUNT_0_1 = "registeredTaskCount <=
0";
+ public static final String REGISTEREDTASKCOUNT_0 = "registeredTaskCount 小于
0";
+ public static final String REGISTEREDTASKCOUNT_0_1 = "registeredTaskCount
小于等于 0";
public static final String
REGISTER_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT =
- "Register subtask {}. runningTaskCount: {}, registeredTaskCount: {}";
+ "注册子任务 {}。runningTaskCount: {}, registeredTaskCount: {}";
public static final String
REPORT_PIPERUNTIMEEXCEPTION_TO_LOCAL_PIPETASKMETA_EXCEPTION_MESSAGE =
- "Report PipeRuntimeException to local PipeTaskMeta({}), exception
message: {}";
- public static final String RUNNINGTASKCOUNT_0 = "runningTaskCount < 0";
- public static final String RUNNINGTASKCOUNT_0_1 = "runningTaskCount <= 0";
+ "向本地 PipeTaskMeta({}) 上报 PipeRuntimeException,异常信息:{}";
+ public static final String RUNNINGTASKCOUNT_0 = "runningTaskCount 小于 0";
+ public static final String RUNNINGTASKCOUNT_0_1 = "runningTaskCount 小于等于 0";
public static final String
SIMPLEPROGRESSINDEXASSIGNER_STARTED_SUCCESSFULLY_ISSIMPLECONSENSUSENABLE_R =
- "SimpleProgressIndexAssigner started successfully.
isSimpleConsensusEnable: {}, "
+ "SimpleProgressIndexAssigner 启动成功。isSimpleConsensusEnable: {}, "
+ "rebootTimes: {}";
public static final String STARTING_SIMPLEPROGRESSINDEXASSIGNER =
- "Starting SimpleProgressIndexAssigner ...";
+ "正在启动 SimpleProgressIndexAssigner ...";
public static final String START_PIPE_DN_TASK_SUCCESSFULLY_WITHIN_MS =
- "Start pipe DN task {} successfully within {} ms";
+ "启动 pipe DN task {} 成功,耗时 {} ms";
public static final String
START_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT =
- "Start subtask {}. runningTaskCount: {}, registeredTaskCount: {}";
+ "启动子任务 {}。runningTaskCount: {}, registeredTaskCount: {}";
public static final String STOP_PIPE_DN_TASK_SUCCESSFULLY_WITHIN_MS =
- "Stop pipe DN task {} successfully within {} ms";
+ "停止 pipe DN task {} 成功,耗时 {} ms";
public static final String STOP_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT
=
- "Stop subtask {}. runningTaskCount: {}, registeredTaskCount: {}";
+ "停止子任务 {}。runningTaskCount: {}, registeredTaskCount: {}";
public static final String SUBTASK_IS_CLOSED_IGNORE_EXCEPTION =
"subtask {} 已关闭, ignore exception";
- public static final String SUBTASK_WORKER_IS_INTERRUPTED = "subtask worker
is interrupted";
+ public static final String SUBTASK_WORKER_IS_INTERRUPTED = "子任务工作线程被中断";
public static final String SUCCESSFULLY_PERSISTED_ALL_PIPE_S_INFO_TO =
"成功 persisted all pipe's info to configNode。";
public static final String THE_EXECUTOR_AND_HAS_BEEN_SUCCESSFULLY_SHUTDOWN =
- "The executor {} and {} has been successfully shutdown.";
+ "执行器 {} 和 {} 已成功关闭。";
// ===================== EVENT =====================
@@ -422,9 +432,16 @@ public final class DataNodePipeMessages {
"加载 snapshot from byteBuffer {} 失败。";
public static final String FAILED_TO_START_SOURCES = "启动 sources 失败。";
public static final String HEARTBEAT_EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE =
- "Heartbeat Event {} can not be supplied because the reference count can
not be increased";
+ "Heartbeat Event {} 无法被提供,因为其引用计数无法增加";
+ public static final String EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE_DATA_IS_LOST =
+ "Event %s 无法被提供,因为其引用计数无法增加,事件代表的数据已经丢失";
public static final String INTERRUPTED_WAITING_FOR_PROCESSOR_TO_STOP =
- "Interrupted waiting for processor to stop";
+ "等待 processor 停止时被中断";
+ public static final String
INTERRUPTED_WHEN_WAITING_FOR_PARSING_PRIVILEGE_FOR_TSFILE =
+ "等待解析 TsFile %s 的权限信息时被中断。";
+ public static final String PARSE_TSFILE_WHEN_CHECKING_PRIVILEGE_ERROR =
+ "检查权限时解析 TsFile %s 失败。原因:%s";
+ public static final String READ_TSFILE_ERROR = "读取 TsFile %s 失败。";
public static final String
IOTDBSCHEMAREGIONSOURCE_DOES_NOT_SUPPORT_TRANSFERRING_EVENTS_UNDER =
"IoTDBSchemaRegionSource 不支持 transferring events under simple consensus";
public static final String NOT_HAS_PRIVILEGE_TO_TRANSFER_EVENT = "没有权限
transfer event: ";
@@ -829,6 +846,8 @@ public final class DataNodePipeMessages {
+ "Peeked event: {}, polled event: {}.";
public static final String THE_FILE_IS_NOT_FOUND_MAY_ALREADY =
"The file {} is not found, may already be deleted.";
+ public static final String NETWORK_FAILED_TO_RECEIVE_TSFILE_STATUS =
+ "网络接收 TsFile %s 失败,状态:%s";
public static final String THE_PIPE_WAS_DROPPED_SO_THE_EVENT =
"The pipe {} was dropped so the event ack {} will be ignored.";
public static final String THE_PIPE_WAS_DROPPED_SO_THE_EVENT_1 =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
index 70226b35dc7..3eb48b2c04b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
@@ -100,8 +100,8 @@ public class PipeDataNodePluginAgent {
if (information.isBuiltin()) {
String errorMessage =
String.format(
- "Failed to register PipePlugin %s, because "
- + "the given PipePlugin name is the same as a built-in
PipePlugin name.",
+ DataNodePipeMessages
+
.FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_NAME_CONFLICTS_WITH_BUILTIN,
pluginName);
LOGGER.warn(errorMessage);
throw new PipeException(errorMessage);
@@ -113,10 +113,9 @@ public class PipeDataNodePluginAgent {
&&
!PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) {
String errMsg =
String.format(
- "Failed to register PipePlugin %s, because "
- + "existed md5 of jar file for pipe plugin %s "
- + "is different from the new jar file.",
- pluginName, pluginName);
+
DataNodePipeMessages.FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_JAR_MD5_MISMATCH,
+ pluginName,
+ pluginName);
LOGGER.warn(errMsg);
throw new PipeException(errMsg);
}
@@ -170,9 +169,11 @@ public class PipeDataNodePluginAgent {
| ClassCastException e) {
String errorMessage =
String.format(
- "Failed to register PipePlugin %s(%s), because "
- + "its instance can not be constructed successfully.
Exception: %s",
- pluginName.toUpperCase(), className, e);
+ DataNodePipeMessages
+
.FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_INSTANCE_CONSTRUCTION_FAILED,
+ pluginName.toUpperCase(),
+ className,
+ e);
LOGGER.warn(errorMessage, e);
throw new PipeException(errorMessage);
}
@@ -210,7 +211,8 @@ public class PipeDataNodePluginAgent {
if (information != null && information.isBuiltin()) {
String errorMessage =
- String.format("Failed to deregister builtin PipePlugin %s.",
pluginName);
+ String.format(
+ DataNodePipeMessages.FAILED_TO_DEREGISTER_BUILTIN_PIPE_PLUGIN,
pluginName);
LOGGER.warn(errorMessage);
throw new PipeException(errorMessage);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index cce3ac29b37..6d4c3580bd2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -610,11 +610,12 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
final String errorMsg =
e instanceof InterruptedException
? String.format(
- "Interrupted when waiting for parsing privilege for TsFile
%s.",
+
DataNodePipeMessages.INTERRUPTED_WHEN_WAITING_FOR_PARSING_PRIVILEGE_FOR_TSFILE,
resource.getTsFilePath())
: String.format(
- "Parse TsFile %s when checking privilege error. Because: %s",
- resource.getTsFilePath(), e.getMessage());
+
DataNodePipeMessages.PARSE_TSFILE_WHEN_CHECKING_PRIVILEGE_ERROR,
+ resource.getTsFilePath(),
+ e.getMessage());
LOGGER.warn(errorMsg, e);
throw new PipeException(errorMsg, e);
} finally {
@@ -861,7 +862,8 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
} catch (final Exception e) {
close();
- final String errorMsg = String.format("Read TsFile %s error.",
tsFile.getPath());
+ final String errorMsg =
+ String.format(DataNodePipeMessages.READ_TSFILE_ERROR,
tsFile.getPath());
LOGGER.warn(errorMsg, e);
throw new PipeException(errorMsg, e);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index 39a5a19a529..5c38c3a8540 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -199,11 +199,17 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
return client;
}
} catch (final Exception e) {
- LOGGER.warn(
- DataNodePipeMessages.FAILED_TO_BORROW_CLIENT_FOR_CACHED_LEADER,
+ PipeLogger.log(
+ ignored ->
+ LOGGER.warn(
+
DataNodePipeMessages.FAILED_TO_BORROW_CLIENT_FOR_CACHED_LEADER,
+ endPoint.getIp(),
+ endPoint.getPort(),
+ e),
+ e,
+ "Failed to borrow client %s:%s for cached leader.",
endPoint.getIp(),
- endPoint.getPort(),
- e);
+ endPoint.getPort());
}
return borrowClient();
@@ -357,11 +363,17 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
client.close();
client.invalidateAll();
} catch (final Exception e) {
- LOGGER.warn(
-
DataNodePipeMessages.FAILED_TO_CLOSE_CLIENT_AFTER_HANDSHAKE_FAILURE,
+ PipeLogger.log(
+ ignored ->
+ LOGGER.warn(
+
DataNodePipeMessages.FAILED_TO_CLOSE_CLIENT_AFTER_HANDSHAKE_FAILURE,
+ targetNodeUrl.getIp(),
+ targetNodeUrl.getPort(),
+ e),
+ e,
+ "Failed to close client %s:%s after handshake failure when the
manager is closed.",
targetNodeUrl.getIp(),
- targetNodeUrl.getPort(),
- e);
+ targetNodeUrl.getPort());
}
}
client.setShouldReturnSelf(true);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java
index a4dba48b7e9..b4766a2dae4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import
org.apache.iotdb.commons.consensus.iotv2.container.IoTV2GlobalComponentContainer;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigurableException;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.consensus.iotconsensusv2.thrift.TCommitId;
@@ -226,8 +227,13 @@ public class IoTConsensusV2AsyncSink extends IoTDBSink
implements ConsensusPipeS
IOTDB_CONFIG.getIotConsensusV2PipelineSize());
}
if (transferBuffer.isEmpty()) {
- LOGGER.info(
-
DataNodePipeMessages.IOTCONSENSUSV2_CONSENSUSGROUP_TRY_TO_REMOVE_EVENT_AFTER,
+ PipeLogger.log(
+ ignored ->
+ LOGGER.info(
+
DataNodePipeMessages.IOTCONSENSUSV2_CONSENSUSGROUP_TRY_TO_REMOVE_EVENT_AFTER,
+ consensusGroupId,
+ event),
+ "IoTConsensusV2-ConsensusGroup-%s: try to remove event-%s after
iotConsensusV2AsyncConnector being closed. Ignore it.",
consensusGroupId,
event);
return;
@@ -240,8 +246,15 @@ public class IoTConsensusV2AsyncSink extends IoTDBSink
implements ConsensusPipeS
if (current.equalsInIoTConsensusV2(event)) {
iterator.remove();
} else {
- LOGGER.warn(
-
DataNodePipeMessages.IOTCONSENSUSV2_CONSENSUSGROUP_EVENT_NOT_FOUND_IN_TRANSFERBUFFER,
+ PipeLogger.log(
+ ignored ->
+ LOGGER.warn(
+ DataNodePipeMessages
+
.IOTCONSENSUSV2_CONSENSUSGROUP_EVENT_NOT_FOUND_IN_TRANSFERBUFFER,
+ consensusGroupId,
+ event,
+ transferBuffer.size()),
+ "IoTConsensusV2-ConsensusGroup-%s: event-%s not found in
transferBuffer, skip removing. queue size = %s",
consensusGroupId,
event,
transferBuffer.size());
@@ -520,8 +533,14 @@ public class IoTConsensusV2AsyncSink extends IoTDBSink
implements ConsensusPipeS
if (System.currentTimeMillis() - retryStartTime >
TimeUnit.SECONDS.toMillis(20)) {
// just in case that some events are polled and re-added into queue
again and again,
// causing this loop to run forever.
- LOGGER.warn(
-
DataNodePipeMessages.IOTCONSENSUSV2_CONSENSUSGROUP_RETRYEVENTQUEUE_IS_NOT_EMPTY_AFTER,
+ PipeLogger.log(
+ ignored ->
+ LOGGER.warn(
+ DataNodePipeMessages
+
.IOTCONSENSUSV2_CONSENSUSGROUP_RETRYEVENTQUEUE_IS_NOT_EMPTY_AFTER,
+ consensusGroupId,
+ retryEventQueue.size()),
+ "IoTConsensusV2-ConsensusGroup-%s: retryEventQueue is not empty
after 20 seconds. retryQueue size: %s",
consensusGroupId,
retryEventQueue.size());
return;
@@ -535,8 +554,16 @@ public class IoTConsensusV2AsyncSink extends IoTDBSink
implements ConsensusPipeS
peekedEvent.getRetryInterval() >
EnrichedEvent.INITIAL_RETRY_INTERVAL_FOR_IOTV2
? peekedEvent.getRetryInterval()
: 0L;
- LOGGER.info(
-
DataNodePipeMessages.IOTCONSENSUSV2_CONSENSUSGROUP_RETRY_WITH_INTERVAL_FOR_INDEX,
+ PipeLogger.log(
+ ignored ->
+ LOGGER.info(
+ DataNodePipeMessages
+
.IOTCONSENSUSV2_CONSENSUSGROUP_RETRY_WITH_INTERVAL_FOR_INDEX,
+ consensusGroupId,
+ retryInterval,
+ peekedEvent.getReplicateIndexForIoTV2(),
+ peekedEvent),
+ "IoTConsensusV2-ConsensusGroup-%s: retry with interval %s for
index %s %s",
consensusGroupId,
retryInterval,
peekedEvent.getReplicateIndexForIoTV2(),
@@ -553,12 +580,14 @@ public class IoTConsensusV2AsyncSink extends IoTDBSink
implements ConsensusPipeS
} else if (peekedEvent instanceof PipeDeleteDataNodeEvent) {
retryTransfer((PipeDeleteDataNodeEvent) peekedEvent);
} else {
- if (LOGGER.isWarnEnabled()) {
- LOGGER.warn(
- DataNodePipeMessages
-
.IOTCONSENSUSV2ASYNCCONNECTOR_DOES_NOT_SUPPORT_TRANSFER_GENERIC_EVENT,
- peekedEvent);
- }
+ PipeLogger.log(
+ ignored ->
+ LOGGER.warn(
+ DataNodePipeMessages
+
.IOTCONSENSUSV2ASYNCCONNECTOR_DOES_NOT_SUPPORT_TRANSFER_GENERIC_EVENT,
+ peekedEvent),
+ "IoTConsensusV2AsyncConnector does not support transfer
generic event: %s.",
+ peekedEvent);
}
},
retryInterval,
@@ -629,15 +658,28 @@ public class IoTConsensusV2AsyncSink extends IoTDBSink
implements ConsensusPipeS
boolean res = retryEventQueue.offer(event);
if (res) {
- LOGGER.info(
- DataNodePipeMessages
-
.IOTCONSENSUSV2_CONSENSUSGROUP_EVENT_REPLICATE_INDEX_TRANSFER_FAILED_1,
+ PipeLogger.log(
+ ignored ->
+ LOGGER.info(
+ DataNodePipeMessages
+
.IOTCONSENSUSV2_CONSENSUSGROUP_EVENT_REPLICATE_INDEX_TRANSFER_FAILED_1,
+ consensusGroupId,
+ event,
+ event.getReplicateIndexForIoTV2()),
+ "IoTConsensusV2-ConsensusGroup-%s: Event %s replicate index %s
transfer failed, will be added to retry queue.",
consensusGroupId,
event,
event.getReplicateIndexForIoTV2());
} else {
- LOGGER.warn(
-
DataNodePipeMessages.IOTCONSENSUSV2_CONSENSUSGROUP_EVENT_REPLICATE_INDEX_TRANSFER_FAILED,
+ PipeLogger.log(
+ ignored ->
+ LOGGER.warn(
+ DataNodePipeMessages
+
.IOTCONSENSUSV2_CONSENSUSGROUP_EVENT_REPLICATE_INDEX_TRANSFER_FAILED,
+ consensusGroupId,
+ event,
+ event.getReplicateIndexForIoTV2()),
+ "IoTConsensusV2-ConsensusGroup-%s: Event %s replicate index %s
transfer failed, added to retry queue failed, this event will be ignored.",
consensusGroupId,
event,
event.getReplicateIndexForIoTV2());
@@ -676,14 +718,23 @@ public class IoTConsensusV2AsyncSink extends IoTDBSink
implements ConsensusPipeS
private void logOnClientException(
final AsyncIoTConsensusV2ServiceClient client, final Exception e) {
if (client == null) {
- LOGGER.warn(THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT, e);
+ PipeLogger.log(
+ ignored -> LOGGER.warn(THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT, e),
+ e,
+ THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT);
} else {
- LOGGER.warn(
- String.format(
- THRIFT_ERROR_FORMATTER_WITH_ENDPOINT,
- client.getTEndpoint().getIp(),
- client.getTEndpoint().getPort()),
- e);
+ PipeLogger.log(
+ ignored ->
+ LOGGER.warn(
+ String.format(
+ THRIFT_ERROR_FORMATTER_WITH_ENDPOINT,
+ client.getTEndpoint().getIp(),
+ client.getTEndpoint().getPort()),
+ e),
+ e,
+ THRIFT_ERROR_FORMATTER_WITH_ENDPOINT,
+ client.getTEndpoint().getIp(),
+ client.getTEndpoint().getPort());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2DeleteEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2DeleteEventHandler.java
index 35ba6a89f7a..36dfdffbf42 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2DeleteEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2DeleteEventHandler.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.handler;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.async.AsyncIoTConsensusV2ServiceClient;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.utils.RetryUtils;
import
org.apache.iotdb.consensus.iotconsensusv2.thrift.TIoTConsensusV2TransferReq;
import
org.apache.iotdb.consensus.iotconsensusv2.thrift.TIoTConsensusV2TransferResp;
@@ -102,12 +103,19 @@ public class IoTConsensusV2DeleteEventHandler
@Override
public void onError(Exception e) {
- LOGGER.warn(
-
DataNodePipeMessages.FAILED_TO_TRANSFER_PIPEDELETENODEEVENT_COMMITTER_KEY_REPLICATE,
+ PipeLogger.log(
+ ignored ->
+ LOGGER.warn(
+
DataNodePipeMessages.FAILED_TO_TRANSFER_PIPEDELETENODEEVENT_COMMITTER_KEY_REPLICATE,
+ event.coreReportMessage(),
+ event.getCommitterKey(),
+ event.getReplicateIndexForIoTV2(),
+ e),
+ e,
+ "Failed to transfer PipeDeleteNodeEvent %s (committer key=%s,
replicate index=%s).",
event.coreReportMessage(),
event.getCommitterKey(),
- event.getReplicateIndexForIoTV2(),
- e);
+ event.getReplicateIndexForIoTV2());
if (RetryUtils.needRetryWithIncreasingInterval(e)) {
// just in case for overflow
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TabletBatchEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TabletBatchEventHandler.java
index b2026c809dd..b107b7fe064 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TabletBatchEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TabletBatchEventHandler.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.handler;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.async.AsyncIoTConsensusV2ServiceClient;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import
org.apache.iotdb.consensus.iotconsensusv2.thrift.TIoTConsensusV2BatchTransferReq;
import
org.apache.iotdb.consensus.iotconsensusv2.thrift.TIoTConsensusV2BatchTransferResp;
import
org.apache.iotdb.consensus.iotconsensusv2.thrift.TIoTConsensusV2TransferResp;
@@ -117,17 +118,26 @@ public class IoTConsensusV2TabletBatchEventHandler
@Override
public void onError(final Exception exception) {
- LOGGER.warn(
-
DataNodePipeMessages.IOTCONSENSUSV2_FAILED_TO_TRANSFER_TABLETINSERTIONEVENT_BATCH_TOTAL,
- events.size(),
+ final Object pipeNames =
events.stream()
.map(
event ->
event instanceof EnrichedEvent
? ((EnrichedEvent) event).getPipeName()
: "UNKNOWN")
- .collect(Collectors.toSet()),
- exception);
+ .collect(Collectors.toSet());
+ PipeLogger.log(
+ ignored ->
+ LOGGER.warn(
+ DataNodePipeMessages
+
.IOTCONSENSUSV2_FAILED_TO_TRANSFER_TABLETINSERTIONEVENT_BATCH_TOTAL,
+ events.size(),
+ pipeNames,
+ exception),
+ exception,
+ "IoTConsensusV2: Failed to transfer TabletInsertionEvent batch. Total
failed events: %s, related pipe names: %s",
+ events.size(),
+ pipeNames);
connector.addFailureEventsToRetryQueue(events);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TabletInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TabletInsertionEventHandler.java
index 4c31942692d..a74e334d5e5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TabletInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TabletInsertionEventHandler.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.handler;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.async.AsyncIoTConsensusV2ServiceClient;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.utils.RetryUtils;
import
org.apache.iotdb.consensus.iotconsensusv2.thrift.TIoTConsensusV2TransferReq;
import
org.apache.iotdb.consensus.iotconsensusv2.thrift.TIoTConsensusV2TransferResp;
@@ -113,12 +114,20 @@ public abstract class
IoTConsensusV2TabletInsertionEventHandler<
@Override
public void onError(Exception exception) {
EnrichedEvent event = (EnrichedEvent) this.event;
- LOGGER.warn(
-
DataNodePipeMessages.FAILED_TO_TRANSFER_TABLETINSERTIONEVENT_COMMITTER_KEY_REPLICATE,
+ PipeLogger.log(
+ ignored ->
+ LOGGER.warn(
+ DataNodePipeMessages
+
.FAILED_TO_TRANSFER_TABLETINSERTIONEVENT_COMMITTER_KEY_REPLICATE,
+ event.coreReportMessage(),
+ event.getCommitterKey(),
+ event.getReplicateIndexForIoTV2(),
+ exception),
+ exception,
+ "Failed to transfer TabletInsertionEvent %s (committer key=%s,
replicate index=%s).",
event.coreReportMessage(),
event.getCommitterKey(),
- event.getReplicateIndexForIoTV2(),
- exception);
+ event.getReplicateIndexForIoTV2());
if (RetryUtils.needRetryWithIncreasingInterval(exception)) {
// just in case for overflow
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java
index 490806cfddb..4e269aaa7e8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.async.AsyncIoTConsensusV2ServiceClient;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import
org.apache.iotdb.commons.pipe.sink.payload.iotconsensusv2.response.IoTConsensusV2TransferFilePieceResp;
import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.consensus.iotconsensusv2.thrift.TCommitId;
@@ -290,13 +291,22 @@ public class IoTConsensusV2TsFileInsertionEventHandler
@Override
public void onError(final Exception exception) {
- LOGGER.warn(
-
DataNodePipeMessages.IOTCONSENSUSV2_FAILED_TO_TRANSFER_TSFILEINSERTIONEVENT_COMMITTER_KEY,
+ PipeLogger.log(
+ ignored ->
+ LOGGER.warn(
+ DataNodePipeMessages
+
.IOTCONSENSUSV2_FAILED_TO_TRANSFER_TSFILEINSERTIONEVENT_COMMITTER_KEY,
+ consensusPipeName,
+ tsFile,
+ event.getCommitterKey(),
+ event.getReplicateIndexForIoTV2(),
+ exception),
+ exception,
+ "IoTConsensusV2-%s: Failed to transfer TsFileInsertionEvent %s
(committer key %s, replicate index %s).",
consensusPipeName,
tsFile,
event.getCommitterKey(),
- event.getReplicateIndexForIoTV2(),
- exception);
+ event.getReplicateIndexForIoTV2());
if (RetryUtils.needRetryWithIncreasingInterval(exception)) {
// just in case for overflow
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
index 5ae7942d201..c0a9eb6a79d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
@@ -441,7 +441,8 @@ public class IoTDBLegacyPipeSink implements PipeConnector {
DataNodePipeMessages.REDIRECT_TO_POSITION_IN_TRANSFERRING_TSFILE, position,
file);
} else if (status.code ==
TSStatusCode.SYNC_FILE_ERROR.getStatusCode()) {
final String errorMsg =
- String.format("Network failed to receive tsFile %s, status: %s",
file, status);
+ String.format(
+
DataNodePipeMessages.NETWORK_FAILED_TO_RECEIVE_TSFILE_STATUS, file, status);
LOGGER.warn(errorMsg);
throw new PipeConnectionException(errorMsg);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index bd3f06ba778..b8b169b1f6a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -269,7 +269,12 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
sealedFile.left));
}
} catch (final Exception e) {
- LOGGER.warn(DataNodePipeMessages.FAILED_TO_TRANSFER_TSFILE_BATCH,
dbTsFilePairs, e);
+ PipeLogger.log(
+ ignored ->
+
LOGGER.warn(DataNodePipeMessages.FAILED_TO_TRANSFER_TSFILE_BATCH,
dbTsFilePairs, e),
+ e,
+ "Failed to transfer tsfile batch (%s).",
+ dbTsFilePairs);
if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
addFailureEventsToRetryQueue(events, e);
}
@@ -461,17 +466,27 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
} catch (final Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
- LOGGER.warn(
-
DataNodePipeMessages.TRANSFER_TSFILE_EVENT_ASYNCHRONOUSLY_WAS_INTERRUPTED,
- pipeTransferTsFileHandler.getTsFile(),
- e);
+ PipeLogger.log(
+ ignored ->
+ LOGGER.warn(
+
DataNodePipeMessages.TRANSFER_TSFILE_EVENT_ASYNCHRONOUSLY_WAS_INTERRUPTED,
+ pipeTransferTsFileHandler.getTsFile(),
+ e),
+ e,
+ "Transfer tsfile event %s asynchronously was interrupted.",
+ pipeTransferTsFileHandler.getTsFile());
}
pipeTransferTsFileHandler.onError(e);
- LOGGER.warn(
-
DataNodePipeMessages.FAILED_TO_TRANSFER_TSFILE_EVENT_ASYNCHRONOUSLY,
- pipeTransferTsFileHandler.getTsFile(),
- e);
+ PipeLogger.log(
+ ignored ->
+ LOGGER.warn(
+
DataNodePipeMessages.FAILED_TO_TRANSFER_TSFILE_EVENT_ASYNCHRONOUSLY,
+ pipeTransferTsFileHandler.getTsFile(),
+ e),
+ e,
+ "Failed to transfer tsfile event %s asynchronously.",
+ pipeTransferTsFileHandler.getTsFile());
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index eaf9f0b73e4..d543e736743 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler;
import org.apache.iotdb.commons.client.ThriftClient;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqBuilder;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
@@ -106,7 +107,10 @@ public abstract class PipeTransferTrackableHandler
client.returnSelf(
(e) -> {
if (e instanceof IllegalStateException) {
-
LOGGER.info(DataNodePipeMessages.ILLEGAL_STATE_WHEN_RETURN_THE_CLIENT_TO);
+ PipeLogger.log(
+ ignored ->
+
LOGGER.info(DataNodePipeMessages.ILLEGAL_STATE_WHEN_RETURN_THE_CLIENT_TO),
+ "Illegal state when return the client to object pool, maybe
the pool is already cleared. Will ignore.");
return true;
}
return false;
@@ -139,9 +143,9 @@ public abstract class PipeTransferTrackableHandler
return;
}
- LOGGER.warn(
- "The body size of the request is too large. The request will be
sliced. Origin req: {}-{}. "
- + "Request body size: {}, threshold: {}",
+ PipeLogger.log(
+ LOGGER::warn,
+ "The body size of the request is too large. The request will be
sliced. Origin req: %s-%s. Request body size: %s, threshold: %s",
req.getVersion(),
req.getType(),
req.body.limit(),
@@ -242,11 +246,12 @@ public abstract class PipeTransferTrackableHandler
final TPipeTransferReq originalReq,
final boolean shouldReturnSelf,
final Exception exception) {
- LOGGER.warn(
- "Failed to transfer slice. Origin req: {}-{}. Retry the whole
transfer.",
+ PipeLogger.log(
+ LOGGER::warn,
+ exception,
+ "Failed to transfer slice. Origin req: %s-%s. Retry the whole
transfer.",
originalReq.getVersion(),
- originalReq.getType(),
- exception);
+ originalReq.getType());
try {
client.setShouldReturnSelf(shouldReturnSelf);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 0eb226c4639..3fa5e557a20 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -161,8 +161,13 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
this.client = client;
if (client == null) {
- LOGGER.warn(
- DataNodePipeMessages.CLIENT_HAS_BEEN_RETURNED_TO_THE_POOL,
+ PipeLogger.log(
+ ignored ->
+ LOGGER.warn(
+ DataNodePipeMessages.CLIENT_HAS_BEEN_RETURNED_TO_THE_POOL,
+ sink.isClosed() ? "CLOSED" : "NOT CLOSED",
+ tsFile),
+ "Client has been returned to the pool. Connector is %s. TsFile: %s.",
sink.isClosed() ? "CLOSED" : "NOT CLOSED",
tsFile);
return;
@@ -429,7 +434,10 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
client.returnSelf(
(e) -> {
if (e instanceof IllegalStateException) {
-
LOGGER.info(DataNodePipeMessages.ILLEGAL_STATE_WHEN_RETURN_THE_CLIENT_TO);
+ PipeLogger.log(
+ ignored ->
+
LOGGER.info(DataNodePipeMessages.ILLEGAL_STATE_WHEN_RETURN_THE_CLIENT_TO),
+ "Illegal state when return the client to object pool, maybe
the pool is already cleared. Will ignore.");
return true;
}
return false;
@@ -442,8 +450,13 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq
req)
throws TException {
if (client == null) {
- LOGGER.warn(
- DataNodePipeMessages.CLIENT_HAS_BEEN_RETURNED_TO_THE_POOL,
+ PipeLogger.log(
+ ignored ->
+ LOGGER.warn(
+ DataNodePipeMessages.CLIENT_HAS_BEEN_RETURNED_TO_THE_POOL,
+ sink.isClosed() ? "CLOSED" : "NOT CLOSED",
+ tsFile),
+ "Client has been returned to the pool. Connector is %s. TsFile: %s.",
sink.isClosed() ? "CLOSED" : "NOT CLOSED",
tsFile);
return;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
index 9af44d54578..c219acbc697 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
@@ -279,9 +279,7 @@ public class PipeRealtimeDataRegionHybridSource extends
PipeRealtimeDataRegionSo
// event and report the exception to PipeRuntimeAgent.
final String errorMessage =
String.format(
- "TsFile Event %s can not be supplied because "
- + "the reference count can not be increased, "
- + "the data represented by this event is lost",
+
DataNodePipeMessages.EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE_DATA_IS_LOST,
event.getEvent());
LOGGER.error(errorMessage);
PipeDataNodeAgent.runtime()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
index 3d9c81bcb0c..2835da02d83 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.source.dataregion.realtime;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
+import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -111,9 +112,7 @@ public class PipeRealtimeDataRegionLogSource extends
PipeRealtimeDataRegionSourc
// and report the exception to PipeRuntimeAgent.
final String errorMessage =
String.format(
- "Event %s can not be supplied because "
- + "the reference count can not be increased, "
- + "the data represented by this event is lost",
+
DataNodePipeMessages.EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE_DATA_IS_LOST,
realtimeEvent.getEvent());
LOGGER.error(errorMessage);
PipeDataNodeAgent.runtime()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index 772154ba07f..b13b2040016 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -504,9 +504,7 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
} else {
// This would not happen, but just in case.
LOGGER.error(
- "Heartbeat Event {} can not be supplied because "
- + "the reference count can not be increased",
- event.getEvent());
+ DataNodePipeMessages.HEARTBEAT_EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE,
event.getEvent());
// Do not report exception since the PipeHeartbeatEvent doesn't affect
// the correction of pipe progress.
@@ -524,9 +522,7 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
// event and report the exception to PipeRuntimeAgent.
final String errorMessage =
String.format(
- "Event %s can not be supplied because "
- + "the reference count can not be increased, "
- + "the data represented by this event is lost",
+
DataNodePipeMessages.EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE_DATA_IS_LOST,
event.getEvent());
LOGGER.error(errorMessage);
PipeDataNodeAgent.runtime()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
index d70d93db548..73bef31d85b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.source.dataregion.realtime;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
+import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -96,9 +97,7 @@ public class PipeRealtimeDataRegionTsFileSource extends
PipeRealtimeDataRegionSo
// and report the exception to PipeRuntimeAgent.
final String errorMessage =
String.format(
- "Event %s can not be supplied because "
- + "the reference count can not be increased, "
- + "the data represented by this event is lost",
+
DataNodePipeMessages.EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE_DATA_IS_LOST,
realtimeEvent.getEvent());
LOGGER.error(errorMessage);
PipeDataNodeAgent.runtime()
diff --git
a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/PipeMessages.java
b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/PipeMessages.java
index 482717e8b04..f0086f26898 100644
---
a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/PipeMessages.java
+++
b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/PipeMessages.java
@@ -395,6 +395,20 @@ public final class PipeMessages {
+ "because of {}. Will retry forever.";
public static final String RETRY_EXECUTING_SUBTASK_FOREVER =
"Retry executing subtask {} (creation time: {}, simple class: {}), retry
count {}, last exception: {}";
+ public static final String
PATTERN_INCLUSION_CANNOT_BE_USED_WITH_PATTERN_OR_PATH =
+ "Pipe: %s cannot be used together with %s or %s.";
+ public static final String
PATTERN_INCLUSION_CANNOT_BE_USED_WITH_PATH_EXCLUSION =
+ "Pipe: %s cannot be used together with %s.";
+ public static final String PATH_AND_PATTERN_CANNOT_BE_USED_TOGETHER =
+ "Pipe: %s and %s cannot be used together.";
+ public static final String PARAMETER_ONLY_SUPPORTS_SINGLE_PATTERN =
+ "Pipe: The parameter %s only supports a single pattern now.";
+ public static final String FAILED_TO_PERFORM_PATTERN_COVERAGE_CHECK =
+ "Pipe: Failed to perform pattern coverage check for inclusion [{}] and
exclusion [{}].";
+ public static final String EXCLUSION_PATTERN_FULLY_COVERS_INCLUSION_PATTERN =
+ "Pipe: The provided exclusion pattern fully covers the inclusion
pattern. This pipe pattern will match nothing. Inclusion: [%s], Exclusion:
[%s]";
+ public static final String EXCLUSION_PATTERN_COVERS_PART_OF_INCLUSION_PATHS =
+ "Pipe: The provided exclusion pattern covers {} out of {} inclusion
paths. These paths will be excluded. Inclusion: [{}], Exclusion: [{}]";
// ===================== PipeAbstractSinkSubtask =====================
diff --git
a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/PipeMessages.java
b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/PipeMessages.java
index eec9a123170..d081a8fd4cf 100644
---
a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/PipeMessages.java
+++
b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/PipeMessages.java
@@ -379,6 +379,20 @@ public final class PipeMessages {
"执行子任务 {}(创建时间:{},类名:{})失败,原因:{}。将无限重试。";
public static final String RETRY_EXECUTING_SUBTASK_FOREVER =
"重试执行子任务 {}(创建时间:{},类名:{}),重试次数 {},上次异常:{}";
+ public static final String
PATTERN_INCLUSION_CANNOT_BE_USED_WITH_PATTERN_OR_PATH =
+ "Pipe:%s 不能与 %s 或 %s 同时使用。";
+ public static final String
PATTERN_INCLUSION_CANNOT_BE_USED_WITH_PATH_EXCLUSION =
+ "Pipe:%s 不能与 %s 同时使用。";
+ public static final String PATH_AND_PATTERN_CANNOT_BE_USED_TOGETHER =
+ "Pipe:%s 和 %s 不能同时使用。";
+ public static final String PARAMETER_ONLY_SUPPORTS_SINGLE_PATTERN =
+ "Pipe:参数 %s 当前只支持单个 pattern。";
+ public static final String FAILED_TO_PERFORM_PATTERN_COVERAGE_CHECK =
+ "Pipe:对 inclusion [{}] 和 exclusion [{}] 执行 pattern 覆盖检查失败。";
+ public static final String EXCLUSION_PATTERN_FULLY_COVERS_INCLUSION_PATTERN =
+ "Pipe:给定 exclusion pattern 完全覆盖了 inclusion pattern。该 pipe pattern
不会匹配任何内容。Inclusion: [%s], Exclusion: [%s]";
+ public static final String EXCLUSION_PATTERN_COVERS_PART_OF_INCLUSION_PATHS =
+ "Pipe:给定 exclusion pattern 覆盖了 {} / {} 条 inclusion
路径。这些路径将被排除。Inclusion: [{}], Exclusion: [{}]";
// ===================== PipeAbstractSinkSubtask =====================
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
index 6631f60a65c..48c4c64abaa 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.commons.pipe.datastructure.pattern;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.i18n.PipeMessages;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternUtil;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
@@ -180,8 +181,10 @@ public abstract class TreePattern {
if (hasPatternInclusionKey && (hasLegacyPathKey || hasLegacyPatternKey)) {
final String msg =
String.format(
- "Pipe: %s cannot be used together with %s or %s.",
- SOURCE_PATTERN_INCLUSION_KEY, SOURCE_PATTERN_KEY,
SOURCE_PATH_KEY);
+
PipeMessages.PATTERN_INCLUSION_CANNOT_BE_USED_WITH_PATTERN_OR_PATH,
+ SOURCE_PATTERN_INCLUSION_KEY,
+ SOURCE_PATTERN_KEY,
+ SOURCE_PATH_KEY);
LOGGER.warn(msg);
throw new PipeException(msg);
}
@@ -219,8 +222,9 @@ public abstract class TreePattern {
EXTRACTOR_PATH_EXCLUSION_KEY, SOURCE_PATH_EXCLUSION_KEY)) {
final String msg =
String.format(
- "Pipe: %s cannot be used together with %s.",
- SOURCE_PATTERN_INCLUSION_KEY, SOURCE_PATH_EXCLUSION_KEY);
+
PipeMessages.PATTERN_INCLUSION_CANNOT_BE_USED_WITH_PATH_EXCLUSION,
+ SOURCE_PATTERN_INCLUSION_KEY,
+ SOURCE_PATH_EXCLUSION_KEY);
LOGGER.warn(msg);
throw new PipeException(msg);
}
@@ -256,9 +260,7 @@ public abstract class TreePattern {
if (inclusionPatterns.isEmpty()) {
final String msg =
String.format(
- "Pipe: The provided exclusion pattern fully covers the inclusion
pattern. "
- + "This pipe pattern will match nothing. "
- + "Inclusion: %s, Exclusion: %s",
+ PipeMessages.EXCLUSION_PATTERN_FULLY_COVERS_INCLUSION_PATTERN,
sourceParameters.getStringByKeys(
EXTRACTOR_PATTERN_INCLUSION_KEY,
SOURCE_PATTERN_INCLUSION_KEY,
@@ -399,7 +401,8 @@ public abstract class TreePattern {
if (path != null && pattern != null) {
final String msg =
- String.format("Pipe: %s and %s cannot be used together.",
pathKeyName, patternKeyName);
+ String.format(
+ PipeMessages.PATH_AND_PATTERN_CANNOT_BE_USED_TOGETHER,
pathKeyName, patternKeyName);
LOGGER.warn(msg);
throw new PipeException(msg);
}
@@ -439,7 +442,7 @@ public abstract class TreePattern {
if (!allowMultiple && patterns.size() > 1) {
final String msg =
- String.format("Pipe: The parameter %s only supports a single pattern
now.", parameterKey);
+ String.format(PipeMessages.PARAMETER_ONLY_SUPPORTS_SINGLE_PATTERN,
parameterKey);
LOGGER.warn(msg);
throw new PipeException(msg);
}
@@ -692,7 +695,7 @@ public abstract class TreePattern {
if (!allowMultiple && patterns.size() > 1) {
final String msg =
- String.format("Pipe: The parameter %s only supports a single pattern
now.", parameterKey);
+ String.format(PipeMessages.PARAMETER_ONLY_SUPPORTS_SINGLE_PATTERN,
parameterKey);
LOGGER.warn(msg);
throw new PipeException(msg);
}
@@ -854,7 +857,7 @@ public abstract class TreePattern {
} catch (final Exception e) {
// This check is best-effort. Do not fail construction.
LOGGER.warn(
- "Pipe: Failed to perform pattern coverage check for inclusion [{}]
and exclusion [{}].",
+ PipeMessages.FAILED_TO_PERFORM_PATTERN_COVERAGE_CHECK,
inclusion.getPattern(),
exclusion.getPattern(),
e);
@@ -866,18 +869,15 @@ public abstract class TreePattern {
// All inclusion paths are covered by the exclusion
final String msg =
String.format(
- "Pipe: The provided exclusion pattern fully covers the inclusion
pattern. "
- + "This pipe pattern will match nothing. "
- + "Inclusion: [%s], Exclusion: [%s]",
- inclusion.getPattern(), exclusion.getPattern());
+ PipeMessages.EXCLUSION_PATTERN_FULLY_COVERS_INCLUSION_PATTERN,
+ inclusion.getPattern(),
+ exclusion.getPattern());
LOGGER.warn(msg);
throw new PipeException(msg);
} else if (coveredCount > 0) {
// Some inclusion paths are covered
LOGGER.warn(
- "Pipe: The provided exclusion pattern covers {} out of {} inclusion
paths. "
- + "These paths will be excluded. "
- + "Inclusion: [{}], Exclusion: [{}]",
+ PipeMessages.EXCLUSION_PATTERN_COVERS_PART_OF_INCLUSION_PATHS,
coveredCount,
inclusionPaths.size(),
inclusion.getPattern(),