This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 0b1bdb745 [Improve][ST-Engine] Improve log output (#3651)
0b1bdb745 is described below
commit 0b1bdb745b2196a798b3192887e69b24baafdcde
Author: Eric <[email protected]>
AuthorDate: Tue Dec 6 09:37:33 2022 +0800
[Improve][ST-Engine] Improve log output (#3651)
* improve log output
* Update
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
Co-authored-by: TaoZex <[email protected]>
Co-authored-by: TaoZex <[email protected]>
---
.../engine/server/dag/physical/PhysicalVertex.java | 42 +++++++++++++++-------
1 file changed, 29 insertions(+), 13 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 38069a16d..f44b69e32 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -146,17 +146,31 @@ public class PhysicalVertex {
}
this.nodeEngine = nodeEngine;
- this.taskFullName =
- String.format(
- "Job %s (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)],
taskGroupLocation: [%s]",
- jobImmutableInformation.getJobConfig().getName(),
- jobImmutableInformation.getJobId(),
- pipelineId,
- totalPipelineNum,
- taskGroup.getTaskGroupName(),
- subTaskGroupIndex + 1,
- parallelism,
- taskGroupLocation);
+ if (LOGGER.isFineEnabled() || LOGGER.isFinestEnabled()) {
+ this.taskFullName =
+ String.format(
+ "Job %s (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)],
taskGroupLocation: [%s]",
+ jobImmutableInformation.getJobConfig().getName(),
+ jobImmutableInformation.getJobId(),
+ pipelineId,
+ totalPipelineNum,
+ taskGroup.getTaskGroupName(),
+ subTaskGroupIndex + 1,
+ parallelism,
+ taskGroupLocation);
+ } else {
+ this.taskFullName =
+ String.format(
+ "Job %s (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)]",
+ jobImmutableInformation.getJobConfig().getName(),
+ jobImmutableInformation.getJobId(),
+ pipelineId,
+ totalPipelineNum,
+ taskGroup.getTaskGroupName(),
+ subTaskGroupIndex + 1,
+ parallelism);
+ }
+
this.taskFuture = new CompletableFuture<>();
this.runningJobStateIMap = runningJobStateIMap;
@@ -167,7 +181,8 @@ public class PhysicalVertex {
this.taskFuture = new CompletableFuture<>();
ExecutionState executionState = (ExecutionState)
runningJobStateIMap.get(taskGroupLocation);
if (executionState != null) {
- LOGGER.info(String.format("The task %s is in state %s when init
state future", taskFullName, executionState));
+ LOGGER.info(
+ String.format("The task %s is in state %s when init state
future", taskFullName, executionState));
}
// If the task state is CANCELING we need call
noticeTaskExecutionServiceCancel().
if (ExecutionState.CANCELING.equals(executionState)) {
@@ -329,7 +344,8 @@ public class PhysicalVertex {
while (!taskFuture.isDone() &&
nodeEngine.getClusterService().getMember(getCurrentExecutionAddress()) != null)
{
try {
i++;
- LOGGER.info(String.format("send cancel %s operator to member
%s", taskFullName, getCurrentExecutionAddress()));
+ LOGGER.info(
+ String.format("Send cancel %s operator to member %s",
taskFullName, getCurrentExecutionAddress()));
nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
new CancelTaskOperation(taskGroupLocation),
getCurrentExecutionAddress())