This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6b128d7e30c KAFKA-14006: Parameterize WorkerConnectorTest suite
(#12307)
6b128d7e30c is described below
commit 6b128d7e30cb654f26f11ae650465aa3caa38185
Author: Chris Egerton <[email protected]>
AuthorDate: Thu Jun 8 08:20:35 2023 -0700
KAFKA-14006: Parameterize WorkerConnectorTest suite (#12307)
Reviewers: Sagar Rao <[email protected]>, Christo Lolov
<[email protected]>, Kvicii <[email protected]>, Mickael Maison
<[email protected]>
---
.../kafka/connect/runtime/WorkerConnectorTest.java | 133 ++++++++++++---------
1 file changed, 78 insertions(+), 55 deletions(-)
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
index cf07d5d2e29..9ced632b906 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.health.ConnectorType;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.sink.SinkConnector;
@@ -29,15 +30,22 @@ import
org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.mockito.quality.Strictness;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -54,7 +62,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@RunWith(Parameterized.class)
public class WorkerConnectorTest {
private static final String VERSION = "1.1";
@@ -68,15 +76,41 @@ public class WorkerConnectorTest {
public ConnectorConfig connectorConfig;
public MockConnectMetrics metrics;
+ @Rule
+ public MockitoRule rule =
MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
+
@Mock private Plugins plugins;
- @Mock private SourceConnector sourceConnector;
- @Mock private SinkConnector sinkConnector;
@Mock private CloseableConnectorContext ctx;
@Mock private ConnectorStatus.Listener listener;
- @Mock private CloseableOffsetStorageReader offsetStorageReader;
- @Mock private ConnectorOffsetBackingStore offsetStore;
@Mock private ClassLoader classLoader;
- private Connector connector;
+
+ private final ConnectorType connectorType;
+ private final Connector connector;
+ private final CloseableOffsetStorageReader offsetStorageReader;
+ private final ConnectorOffsetBackingStore offsetStore;
+
+ @Parameterized.Parameters
+ public static Collection<ConnectorType> parameters() {
+ return Arrays.asList(ConnectorType.SOURCE, ConnectorType.SINK);
+ }
+
+ public WorkerConnectorTest(ConnectorType connectorType) {
+ this.connectorType = connectorType;
+ switch (connectorType) {
+ case SINK:
+ this.connector = mock(SinkConnector.class);
+ this.offsetStorageReader = null;
+ this.offsetStore = null;
+ break;
+ case SOURCE:
+ this.connector = mock(SourceConnector.class);
+ this.offsetStorageReader =
mock(CloseableOffsetStorageReader.class);
+ this.offsetStore = mock(ConnectorOffsetBackingStore.class);
+ break;
+ default:
+ throw new IllegalStateException("Unexpected connector type: "
+ connectorType);
+ }
+ }
@Before
public void setup() {
@@ -92,7 +126,6 @@ public class WorkerConnectorTest {
@Test
public void testInitializeFailure() {
RuntimeException exception = new RuntimeException();
- connector = sourceConnector;
when(connector.version()).thenReturn(VERSION);
doThrow(exception).when(connector).initialize(any());
@@ -113,13 +146,12 @@ public class WorkerConnectorTest {
@Test
public void testFailureIsFinalState() {
RuntimeException exception = new RuntimeException();
- connector = sinkConnector;
when(connector.version()).thenReturn(VERSION);
doThrow(exception).when(connector).initialize(any());
Callback<TargetState> onStateChange = mockCallback();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR,
connector, connectorConfig, ctx, metrics, listener, null, null, classLoader);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR,
connector, connectorConfig, ctx, metrics, listener, offsetStorageReader,
offsetStore, classLoader);
workerConnector.initialize();
assertFailedMetric(workerConnector);
@@ -140,15 +172,13 @@ public class WorkerConnectorTest {
@Test
public void testStartupAndShutdown() {
- connector = sourceConnector;
-
when(connector.version()).thenReturn(VERSION);
Callback<TargetState> onStateChange = mockCallback();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR,
connector, connectorConfig, ctx, metrics, listener, offsetStorageReader,
offsetStore, classLoader);
workerConnector.initialize();
- assertInitializedSourceMetric(workerConnector);
+ assertInitializedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
assertRunningMetric(workerConnector);
workerConnector.shutdown();
@@ -166,14 +196,12 @@ public class WorkerConnectorTest {
@Test
public void testStartupAndPause() {
- connector = sinkConnector;
when(connector.version()).thenReturn(VERSION);
Callback<TargetState> onStateChange = mockCallback();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR,
connector, connectorConfig, ctx, metrics, listener, null, null, classLoader);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR,
connector, connectorConfig, ctx, metrics, listener, offsetStorageReader,
offsetStore, classLoader);
workerConnector.initialize();
- assertInitializedSinkMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
assertRunningMetric(workerConnector);
@@ -197,14 +225,13 @@ public class WorkerConnectorTest {
@Test
public void testStartupAndStop() {
- connector = sinkConnector;
when(connector.version()).thenReturn(VERSION);
Callback<TargetState> onStateChange = mockCallback();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR,
connector, connectorConfig, ctx, metrics, listener, null, null, classLoader);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR,
connector, connectorConfig, ctx, metrics, listener, offsetStorageReader,
offsetStore, classLoader);
workerConnector.initialize();
- assertInitializedSinkMetric(workerConnector);
+ assertInitializedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
assertRunningMetric(workerConnector);
@@ -228,8 +255,6 @@ public class WorkerConnectorTest {
@Test
public void testOnResume() {
- connector = sourceConnector;
-
when(connector.version()).thenReturn(VERSION);
Callback<TargetState> onStateChange = mockCallback();
@@ -237,7 +262,7 @@ public class WorkerConnectorTest {
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR,
connector, connectorConfig, ctx, metrics, listener, offsetStorageReader,
offsetStore, classLoader);
workerConnector.initialize();
- assertInitializedSourceMetric(workerConnector);
+ assertInitializedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.PAUSED, onStateChange);
assertPausedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
@@ -260,14 +285,13 @@ public class WorkerConnectorTest {
@Test
public void testStartupPaused() {
- connector = sinkConnector;
when(connector.version()).thenReturn(VERSION);
Callback<TargetState> onStateChange = mockCallback();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR,
connector, connectorConfig, ctx, metrics, listener, null, null, classLoader);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR,
connector, connectorConfig, ctx, metrics, listener, offsetStorageReader,
offsetStore, classLoader);
workerConnector.initialize();
- assertInitializedSinkMetric(workerConnector);
+ assertInitializedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.PAUSED, onStateChange);
assertPausedMetric(workerConnector);
workerConnector.shutdown();
@@ -285,14 +309,13 @@ public class WorkerConnectorTest {
@Test
public void testStartupStopped() {
- connector = sinkConnector;
when(connector.version()).thenReturn(VERSION);
Callback<TargetState> onStateChange = mockCallback();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR,
connector, connectorConfig, ctx, metrics, listener, null, null, classLoader);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR,
connector, connectorConfig, ctx, metrics, listener, offsetStorageReader,
offsetStore, classLoader);
workerConnector.initialize();
- assertInitializedSinkMetric(workerConnector);
+ assertInitializedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STOPPED, onStateChange);
assertStoppedMetric(workerConnector);
workerConnector.shutdown();
@@ -311,16 +334,15 @@ public class WorkerConnectorTest {
@Test
public void testStartupFailure() {
RuntimeException exception = new RuntimeException();
- connector = sinkConnector;
when(connector.version()).thenReturn(VERSION);
doThrow(exception).when(connector).start(CONFIG);
Callback<TargetState> onStateChange = mockCallback();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR,
connector, connectorConfig, ctx, metrics, listener, null, null, classLoader);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR,
connector, connectorConfig, ctx, metrics, listener, offsetStorageReader,
offsetStore, classLoader);
workerConnector.initialize();
- assertInitializedSinkMetric(workerConnector);
+ assertInitializedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
assertFailedMetric(workerConnector);
workerConnector.shutdown();
@@ -339,7 +361,6 @@ public class WorkerConnectorTest {
@Test
public void testStopFailure() {
RuntimeException exception = new RuntimeException();
- connector = sourceConnector;
when(connector.version()).thenReturn(VERSION);
@@ -352,7 +373,7 @@ public class WorkerConnectorTest {
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR,
connector, connectorConfig, ctx, metrics, listener, offsetStorageReader,
offsetStore, classLoader);
workerConnector.initialize();
- assertInitializedSourceMetric(workerConnector);
+ assertInitializedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED,
onFirstStateChange);
assertRunningMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STOPPED,
onSecondStateChange);
@@ -382,7 +403,6 @@ public class WorkerConnectorTest {
@Test
public void testShutdownFailure() {
RuntimeException exception = new RuntimeException();
- connector = sourceConnector;
when(connector.version()).thenReturn(VERSION);
@@ -392,7 +412,7 @@ public class WorkerConnectorTest {
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR,
connector, connectorConfig, ctx, metrics, listener, offsetStorageReader,
offsetStore, classLoader);
workerConnector.initialize();
- assertInitializedSourceMetric(workerConnector);
+ assertInitializedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
assertRunningMetric(workerConnector);
workerConnector.shutdown();
@@ -410,8 +430,6 @@ public class WorkerConnectorTest {
@Test
public void testTransitionStartedToStarted() {
- connector = sourceConnector;
-
when(connector.version()).thenReturn(VERSION);
Callback<TargetState> onStateChange = mockCallback();
@@ -419,7 +437,7 @@ public class WorkerConnectorTest {
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR,
connector, connectorConfig, ctx, metrics, listener, offsetStorageReader,
offsetStore, classLoader);
workerConnector.initialize();
- assertInitializedSourceMetric(workerConnector);
+ assertInitializedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
assertRunningMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
@@ -439,14 +457,13 @@ public class WorkerConnectorTest {
@Test
public void testTransitionPausedToPaused() {
- connector = sourceConnector;
when(connector.version()).thenReturn(VERSION);
Callback<TargetState> onStateChange = mockCallback();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR,
connector, connectorConfig, ctx, metrics, listener, offsetStorageReader,
offsetStore, classLoader);
workerConnector.initialize();
- assertInitializedSourceMetric(workerConnector);
+ assertInitializedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
assertRunningMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.PAUSED, onStateChange);
@@ -471,14 +488,13 @@ public class WorkerConnectorTest {
@Test
public void testTransitionStoppedToStopped() {
- connector = sourceConnector;
when(connector.version()).thenReturn(VERSION);
Callback<TargetState> onStateChange = mockCallback();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR,
connector, connectorConfig, ctx, metrics, listener, offsetStorageReader,
offsetStore, classLoader);
workerConnector.initialize();
- assertInitializedSourceMetric(workerConnector);
+ assertInitializedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
assertRunningMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STOPPED, onStateChange);
@@ -503,13 +519,13 @@ public class WorkerConnectorTest {
@Test
public void testFailConnectorThatIsNeitherSourceNorSink() {
- connector = mock(Connector.class);
- when(connector.version()).thenReturn(VERSION);
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR,
connector, connectorConfig, ctx, metrics, listener, offsetStorageReader,
offsetStore, classLoader);
+ Connector badConnector = mock(Connector.class);
+ when(badConnector.version()).thenReturn(VERSION);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR,
badConnector, connectorConfig, ctx, metrics, listener, offsetStorageReader,
offsetStore, classLoader);
workerConnector.initialize();
- verify(connector).version();
+ verify(badConnector).version();
ArgumentCaptor<Throwable> exceptionCapture =
ArgumentCaptor.forClass(Throwable.class);
verify(listener).onFailure(eq(CONNECTOR), exceptionCapture.capture());
Throwable e = exceptionCapture.getValue();
@@ -557,12 +573,19 @@ public class WorkerConnectorTest {
assertFalse(workerConnector.metrics().isRunning());
}
- protected void assertInitializedSinkMetric(WorkerConnector
workerConnector) {
- assertInitializedMetric(workerConnector, "sink");
- }
-
- protected void assertInitializedSourceMetric(WorkerConnector
workerConnector) {
- assertInitializedMetric(workerConnector, "source");
+ protected void assertInitializedMetric(WorkerConnector workerConnector) {
+ String expectedType;
+ switch (connectorType) {
+ case SINK:
+ expectedType = "sink";
+ break;
+ case SOURCE:
+ expectedType = "source";
+ break;
+ default:
+ throw new IllegalStateException("Unexpected connector type: "
+ connectorType);
+ }
+ assertInitializedMetric(workerConnector, expectedType);
}
protected void assertInitializedMetric(WorkerConnector workerConnector,
String expectedType) {
@@ -588,10 +611,10 @@ public class WorkerConnectorTest {
private void verifyInitialize() {
verify(connector).version();
- if (connector instanceof SourceConnector) {
+ if (connectorType == ConnectorType.SOURCE) {
verify(offsetStore).start();
verify(connector).initialize(any(SourceConnectorContext.class));
- } else {
+ } else if (connectorType == ConnectorType.SINK) {
verify(connector).initialize(any(SinkConnectorContext.class));
}
}
@@ -606,7 +629,7 @@ public class WorkerConnectorTest {
private void verifyShutdown(int connectorStops, boolean clean, boolean
started) {
verify(ctx).close();
- if (connector instanceof SourceConnector) {
+ if (connectorType == ConnectorType.SOURCE) {
verify(offsetStorageReader).close();
verify(offsetStore).stop();
}