This is an automated email from the ASF dual-hosted git repository.
weiraowang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 3efcf0c5ac Fix message in MessageRetryRunner might disorder (#14725)
3efcf0c5ac is described below
commit 3efcf0c5acc55df05b09187e17042f7552847b04
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Aug 9 20:19:02 2023 +0800
Fix message in MessageRetryRunner might disorder (#14725)
---
.../server/worker/message/MessageRetryRunner.java | 79 ++++++++++++++++------
1 file changed, 60 insertions(+), 19 deletions(-)
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
index 830f470a44..44befbe31d 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
@@ -27,12 +27,15 @@ import
org.apache.dolphinscheduler.remote.command.MessageType;
import org.apache.commons.collections4.MapUtils;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import lombok.Data;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@@ -40,6 +43,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
+import com.google.common.base.Objects;
+
@Component
@Slf4j
public class MessageRetryRunner extends BaseDaemonThread {
@@ -48,15 +53,15 @@ public class MessageRetryRunner extends BaseDaemonThread {
super("WorkerMessageRetryRunnerThread");
}
- private static long MESSAGE_RETRY_WINDOW =
Duration.ofMinutes(5L).toMillis();
+ private static final long MESSAGE_RETRY_WINDOW =
Duration.ofMinutes(5L).toMillis();
@Lazy
@Autowired
private List<MessageSender> messageSenders;
- private Map<MessageType, MessageSender<BaseMessage>> messageSenderMap =
new HashMap<>();
+ private final Map<MessageType, MessageSender<BaseMessage>>
messageSenderMap = new HashMap<>();
- private Map<Integer, Map<MessageType, BaseMessage>> needToRetryMessages =
new ConcurrentHashMap<>();
+ private final Map<Integer, List<TaskInstanceMessage>> needToRetryMessages
= new ConcurrentHashMap<>();
@Override
public synchronized void start() {
@@ -70,14 +75,14 @@ public class MessageRetryRunner extends BaseDaemonThread {
}
public void addRetryMessage(int taskInstanceId, @NonNull MessageType
messageType, BaseMessage baseMessage) {
- needToRetryMessages.computeIfAbsent(taskInstanceId, k -> new
ConcurrentHashMap<>()).put(messageType,
- baseMessage);
+ needToRetryMessages.computeIfAbsent(taskInstanceId, k ->
Collections.synchronizedList(new ArrayList<>()))
+ .add(TaskInstanceMessage.of(taskInstanceId, messageType,
baseMessage));
}
public void removeRetryMessage(int taskInstanceId, @NonNull MessageType
messageType) {
- Map<MessageType, BaseMessage> retryMessages =
needToRetryMessages.get(taskInstanceId);
- if (retryMessages != null) {
- retryMessages.remove(messageType);
+ List<TaskInstanceMessage> taskInstanceMessages =
needToRetryMessages.get(taskInstanceId);
+ if (taskInstanceMessages != null) {
+ taskInstanceMessages.remove(TaskInstanceMessage.of(taskInstanceId,
messageType, null));
}
}
@@ -86,10 +91,10 @@ public class MessageRetryRunner extends BaseDaemonThread {
}
public void updateMessageHost(int taskInstanceId, String
messageReceiverHost) {
- Map<MessageType, BaseMessage> needToRetryMessages =
this.needToRetryMessages.get(taskInstanceId);
- if (needToRetryMessages != null) {
- needToRetryMessages.values().forEach(baseMessage -> {
- baseMessage.setMessageReceiverAddress(messageReceiverHost);
+ List<TaskInstanceMessage> taskInstanceMessages =
this.needToRetryMessages.get(taskInstanceId);
+ if (taskInstanceMessages != null) {
+ taskInstanceMessages.forEach(taskInstanceMessage -> {
+
taskInstanceMessage.getMessage().setMessageReceiverAddress(messageReceiverHost);
});
}
}
@@ -102,21 +107,21 @@ public class MessageRetryRunner extends BaseDaemonThread {
}
long now = System.currentTimeMillis();
- Iterator<Map.Entry<Integer, Map<MessageType, BaseMessage>>>
iterator =
+ Iterator<Map.Entry<Integer, List<TaskInstanceMessage>>>
iterator =
needToRetryMessages.entrySet().iterator();
while (iterator.hasNext()) {
- Map.Entry<Integer, Map<MessageType, BaseMessage>>
taskEntry = iterator.next();
+ Map.Entry<Integer, List<TaskInstanceMessage>> taskEntry =
iterator.next();
Integer taskInstanceId = taskEntry.getKey();
- Map<MessageType, BaseMessage> retryMessageMap =
taskEntry.getValue();
- if (retryMessageMap.isEmpty()) {
+ List<TaskInstanceMessage> taskInstanceMessages =
taskEntry.getValue();
+ if (taskInstanceMessages.isEmpty()) {
iterator.remove();
continue;
}
LogUtils.setTaskInstanceIdMDC(taskInstanceId);
try {
- for (Map.Entry<MessageType, BaseMessage> messageEntry
: retryMessageMap.entrySet()) {
- MessageType messageType = messageEntry.getKey();
- BaseMessage message = messageEntry.getValue();
+ for (TaskInstanceMessage taskInstanceMessage :
taskInstanceMessages) {
+ MessageType messageType =
taskInstanceMessage.getMessageType();
+ BaseMessage message =
taskInstanceMessage.getMessage();
if (now - message.getMessageSendTime() >
MESSAGE_RETRY_WINDOW) {
log.info("Begin retry send message to master,
message: {}", message);
message.setMessageSendTime(now);
@@ -144,4 +149,40 @@ public class MessageRetryRunner extends BaseDaemonThread {
public void clearMessage() {
needToRetryMessages.clear();
}
+
+ /**
+ * If two message has the same taskInstanceId and messageType they will be
considered as the same message
+ */
+ @Data
+ public static class TaskInstanceMessage {
+
+ private long taskInstanceId;
+ private MessageType messageType;
+ private BaseMessage message;
+
+ public static TaskInstanceMessage of(long taskInstanceId, MessageType
messageType, BaseMessage message) {
+ TaskInstanceMessage taskInstanceMessage = new
TaskInstanceMessage();
+ taskInstanceMessage.setTaskInstanceId(taskInstanceId);
+ taskInstanceMessage.setMessageType(messageType);
+ taskInstanceMessage.setMessage(message);
+ return taskInstanceMessage;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TaskInstanceMessage that = (TaskInstanceMessage) o;
+ return taskInstanceId == that.taskInstanceId && messageType ==
that.messageType;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(taskInstanceId, messageType);
+ }
+ }
}