This is an automated email from the ASF dual-hosted git repository.
xincheng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 69676b445c [Improvement][UT] Improve Worker registry coverage (#15380)
69676b445c is described below
commit 69676b445c6a87dbd97c8bfd5797a26600e1bef6
Author: John Huang <[email protected]>
AuthorDate: Tue Feb 6 10:45:53 2024 +0800
[Improvement][UT] Improve Worker registry coverage (#15380)
Co-authored-by: fuchanghai <[email protected]>
Co-authored-by: Eric Gao <[email protected]>
Co-authored-by: Rick Cheng <[email protected]>
---
.../server/worker/registry/WorkerStopStrategy.java | 3 -
.../worker/registry/WorkerWaitingStrategy.java | 11 ++
.../WorkerConnectionStateListenerTest.java | 60 +++++++
.../worker/registry/WorkerRegistryClientTest.java | 40 +++--
.../server/worker/registry/WorkerStrategyTest.java | 187 +++++++++++++++++++++
5 files changed, 284 insertions(+), 17 deletions(-)
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java
index ee4e960a80..4c575a29e8 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.worker.registry;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.StrategyType;
-import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import lombok.extern.slf4j.Slf4j;
@@ -34,8 +33,6 @@ public class WorkerStopStrategy implements
WorkerConnectStrategy {
@Autowired
public RegistryClient registryClient;
- @Autowired
- private WorkerConfig workerConfig;
@Override
public void disconnect() {
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
index 4437e3545d..2be156b5df 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
@@ -29,6 +29,7 @@ import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThread
import java.time.Duration;
+import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -52,6 +53,16 @@ public class WorkerWaitingStrategy implements
WorkerConnectStrategy {
@Autowired
private WorkerTaskExecutorThreadPool workerManagerThread;
+ public WorkerWaitingStrategy(@NonNull WorkerConfig workerConfig,
+ @NonNull RegistryClient registryClient,
+ @NonNull MessageRetryRunner
messageRetryRunner,
+ @NonNull WorkerTaskExecutorThreadPool
workerManagerThread) {
+ this.workerConfig = workerConfig;
+ this.registryClient = registryClient;
+ this.messageRetryRunner = messageRetryRunner;
+ this.workerManagerThread = workerManagerThread;
+ }
+
@Override
public void disconnect() {
try {
diff --git
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListenerTest.java
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListenerTest.java
new file mode 100644
index 0000000000..14d75779b9
--- /dev/null
+++
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListenerTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.registry;
+
+import static org.mockito.Mockito.times;
+
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * worker registry test
+ */
+@ExtendWith(MockitoExtension.class)
+public class WorkerConnectionStateListenerTest {
+
+ private static final Logger log =
LoggerFactory.getLogger(WorkerConnectionStateListenerTest.class);
+ @InjectMocks
+ private WorkerConnectionStateListener workerConnectionStateListener;
+ @Mock
+ private WorkerConfig workerConfig;
+ @Mock
+ private WorkerConnectStrategy workerConnectStrategy;
+
+ @Test
+ public void testWorkerConnectionStateListener() {
+ workerConnectionStateListener.onUpdate(ConnectionState.CONNECTED);
+
+ workerConnectionStateListener.onUpdate(ConnectionState.RECONNECTED);
+ Mockito.verify(workerConnectStrategy, times(1)).reconnect();
+
+ workerConnectionStateListener.onUpdate(ConnectionState.SUSPENDED);
+
+ workerConnectionStateListener.onUpdate(ConnectionState.DISCONNECTED);
+ Mockito.verify(workerConnectStrategy, times(1)).disconnect();
+ }
+}
diff --git
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
index f44c759420..8c7071cf60 100644
---
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
+++
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
@@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.worker.registry;
import static org.mockito.BDDMockito.given;
+import org.apache.dolphinscheduler.common.IStoppable;
+import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
@@ -29,6 +31,9 @@ import
org.apache.dolphinscheduler.server.worker.config.WorkerServerLoadProtecti
import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Optional;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -37,6 +42,8 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* worker registry test
@@ -44,26 +51,24 @@ import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class WorkerRegistryClientTest {
+ private static final Logger log =
LoggerFactory.getLogger(WorkerRegistryClientTest.class);
@InjectMocks
private WorkerRegistryClient workerRegistryClient;
-
@Mock
private RegistryClient registryClient;
-
@Mock
private WorkerConfig workerConfig;
-
@Mock
private MetricsProvider metricsProvider;
-
@Mock
private WorkerTaskExecutorThreadPool workerManagerThread;
-
@Mock
private WorkerConnectStrategy workerConnectStrategy;
+ @Mock
+ private IStoppable stoppable;
@Test
- public void testStart() {
+ public void testWorkerRegistryClientbasic() {
given(workerConfig.getWorkerAddress()).willReturn(NetUtils.getAddr(1234));
given(workerConfig.getMaxHeartbeatInterval()).willReturn(Duration.ofSeconds(1));
@@ -75,16 +80,23 @@ public class WorkerRegistryClientTest {
workerRegistryClient.initWorkRegistry();
workerRegistryClient.start();
- Assertions.assertTrue(true);
+ workerRegistryClient.setRegistryStoppable(stoppable);
}
@Test
- public void testUnRegistry() {
-
- }
-
- @Test
- public void testGetWorkerZkPaths() {
-
+ public void testWorkerRegistryClientgetAlertServerAddress() {
+
given(registryClient.getServerList(Mockito.any(RegistryNodeType.class)))
+ .willReturn(new ArrayList<Server>());
+ Assertions.assertEquals(workerRegistryClient.getAlertServerAddress(),
Optional.empty());
+ Mockito.reset(registryClient);
+ String host = "test";
+ Integer port = 1;
+ Server server = new Server();
+ server.setHost(host);
+ server.setPort(port);
+
given(registryClient.getServerList(Mockito.any(RegistryNodeType.class)))
+ .willReturn(new ArrayList<Server>(Arrays.asList(server)));
+
Assertions.assertEquals(workerRegistryClient.getAlertServerAddress().get().getAddress(),
+ String.format("%s:%d", host, port));
}
}
diff --git
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStrategyTest.java
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStrategyTest.java
new file mode 100644
index 0000000000..671fac5277
--- /dev/null
+++
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStrategyTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.registry;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.doNothing;
+
+import org.apache.dolphinscheduler.common.IStoppable;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleException;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
+import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
+import org.apache.dolphinscheduler.registry.api.RegistryException;
+import org.apache.dolphinscheduler.registry.api.StrategyType;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
+import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
+
+import java.time.Duration;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * worker registry test
+ */
+@ExtendWith(MockitoExtension.class)
+public class WorkerStrategyTest {
+
+ private static final Logger log =
LoggerFactory.getLogger(WorkerStrategyTest.class);
+ @Mock
+ private RegistryClient registryClient;
+ @Mock
+ private IStoppable stoppable;
+ @Mock
+ private WorkerConfig workerConfig;
+ @Mock
+ private WorkerRpcServer workerRpcServer;
+ @Mock
+ private MessageRetryRunner messageRetryRunner;
+ @Mock
+ private WorkerTaskExecutorThreadPool workerManagerThread;
+ @Mock
+ private ConnectStrategyProperties connectStrategyProperties;
+
+ @Test
+ public void testWorkerStopStrategy() {
+ given(registryClient.getStoppable())
+ .willReturn(stoppable);
+ WorkerStopStrategy workerStopStrategy = new WorkerStopStrategy();
+ workerStopStrategy.registryClient = registryClient;
+ workerStopStrategy.reconnect();
+ workerStopStrategy.disconnect();
+ Assertions.assertEquals(workerStopStrategy.getStrategyType(),
StrategyType.STOP);
+ }
+
+ @Test
+ public void testWorkerWaitingStrategyreconnect() {
+ WorkerWaitingStrategy workerWaitingStrategy = new
WorkerWaitingStrategy(
+ workerConfig,
+ registryClient,
+ messageRetryRunner,
+ workerManagerThread);
+ Assertions.assertEquals(workerWaitingStrategy.getStrategyType(),
StrategyType.WAITING);
+
+ try (
+ MockedStatic<ServerLifeCycleManager>
serverLifeCycleManagerMockedStatic =
+ Mockito.mockStatic(ServerLifeCycleManager.class)) {
+ serverLifeCycleManagerMockedStatic
+ .when(() -> ServerLifeCycleManager.isRunning())
+ .thenReturn(true);
+ workerWaitingStrategy.reconnect();
+
+ }
+
+ try (
+ MockedStatic<ServerLifeCycleManager>
serverLifeCycleManagerMockedStatic =
+ Mockito.mockStatic(ServerLifeCycleManager.class)) {
+ doNothing().when(stoppable).stop(anyString());
+ given(registryClient.getStoppable())
+ .willReturn(stoppable);
+ serverLifeCycleManagerMockedStatic
+ .when(() -> ServerLifeCycleManager.recoverFromWaiting())
+ .thenThrow(new ServerLifeCycleException(""));
+ workerWaitingStrategy.reconnect();
+ }
+
+ try (
+ MockedStatic<ServerLifeCycleManager>
serverLifeCycleManagerMockedStatic =
+ Mockito.mockStatic(ServerLifeCycleManager.class)) {
+ serverLifeCycleManagerMockedStatic
+ .when(() -> ServerLifeCycleManager.recoverFromWaiting())
+ .thenAnswer(invocation -> null);
+ workerWaitingStrategy.reconnect();
+ }
+ }
+
+ @Test
+ public void testWorkerWaitingStrategydisconnect() {
+ WorkerWaitingStrategy workerWaitingStrategy = new
WorkerWaitingStrategy(
+ workerConfig,
+ registryClient,
+ messageRetryRunner,
+ workerManagerThread);
+ Assertions.assertEquals(workerWaitingStrategy.getStrategyType(),
StrategyType.WAITING);
+
+ try (
+ MockedStatic<ServerLifeCycleManager>
serverLifeCycleManagerMockedStatic =
+ Mockito.mockStatic(ServerLifeCycleManager.class)) {
+ doNothing().when(stoppable).stop(anyString());
+ given(registryClient.getStoppable())
+ .willReturn(stoppable);
+ serverLifeCycleManagerMockedStatic
+ .when(() -> ServerLifeCycleManager.toWaiting())
+ .thenThrow(new ServerLifeCycleException(""));
+ workerWaitingStrategy.disconnect();
+ }
+
+ try (
+ MockedStatic<ServerLifeCycleManager>
serverLifeCycleManagerMockedStatic =
+ Mockito.mockStatic(ServerLifeCycleManager.class)) {
+
given(connectStrategyProperties.getMaxWaitingTime()).willReturn(Duration.ofSeconds(1));
+
given(workerConfig.getRegistryDisconnectStrategy()).willReturn(connectStrategyProperties);
+ Mockito.reset(registryClient);
+ doNothing().when(registryClient).connectUntilTimeout(any());
+ serverLifeCycleManagerMockedStatic
+ .when(() -> ServerLifeCycleManager.toWaiting())
+ .thenAnswer(invocation -> null);
+ workerWaitingStrategy.disconnect();
+ }
+
+ try (
+ MockedStatic<ServerLifeCycleManager>
serverLifeCycleManagerMockedStatic =
+ Mockito.mockStatic(ServerLifeCycleManager.class)) {
+
given(connectStrategyProperties.getMaxWaitingTime()).willReturn(Duration.ofSeconds(1));
+
given(workerConfig.getRegistryDisconnectStrategy()).willReturn(connectStrategyProperties);
+ Mockito.reset(registryClient);
+ doNothing().when(stoppable).stop(anyString());
+ given(registryClient.getStoppable())
+ .willReturn(stoppable);
+ Mockito.doThrow(new
RegistryException("TEST")).when(registryClient).connectUntilTimeout(any());
+ serverLifeCycleManagerMockedStatic
+ .when(() -> ServerLifeCycleManager.toWaiting())
+ .thenAnswer(invocation -> null);
+ workerWaitingStrategy.disconnect();
+ }
+
+ try (
+ MockedStatic<ServerLifeCycleManager>
serverLifeCycleManagerMockedStatic =
+ Mockito.mockStatic(ServerLifeCycleManager.class)) {
+ Mockito.reset(workerConfig);
+ given(workerConfig.getRegistryDisconnectStrategy()).willThrow(new
NullPointerException(""));
+ doNothing().when(stoppable).stop(anyString());
+ given(registryClient.getStoppable())
+ .willReturn(stoppable);
+ serverLifeCycleManagerMockedStatic
+ .when(() -> ServerLifeCycleManager.toWaiting())
+ .thenAnswer(invocation -> null);
+ workerWaitingStrategy.disconnect();
+ }
+ }
+}