This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new d8b6093 KAFKA-10792: Prevent source task shutdown from blocking
herder thread (#9669)
d8b6093 is described below
commit d8b60939b6e14b8cd47e92520b9299ce5dfde5e5
Author: Chris Egerton <[email protected]>
AuthorDate: Fri Dec 4 12:48:23 2020 -0500
KAFKA-10792: Prevent source task shutdown from blocking herder thread
(#9669)
Changes the `WorkerSourceTask` class to only call `SourceTask::stop` from
the task thread when the task is actually stopped (via `Source:task::close`
just before `WorkerTask::run` completes), and only if an attempt has been made
to start the task (which will not be the case if it was created in the paused
state and then shut down before being started). This prevents
`SourceTask::stop` from being indirectly invoked on the herder's thread, which
can have adverse effects if the task is u [...]
Unit tests are tweaked where necessary to account for this new logic, which
covers some edge cases mentioned in PR #5020 that were unaddressed up until now.
The existing integration tests for blocking connectors are expanded to also
include cases for blocking source and sink tasks. Full coverage of every
source/sink task method is intentionally omitted from these expanded tests in
order to avoid inflating test runtime (each one adds an extra 5 seconds at
minimum) and because the tests that are added here were sufficient to reproduce
the bug with source task shutdown.
Author: Chris Egerton <[email protected]>
Reviewers: Nigel Liang <[email protected]>, Tom Bentley
<[email protected]>, Randall Hauch <[email protected]>
---
.../kafka/connect/runtime/WorkerSourceTask.java | 44 +-
.../connect/integration/BlockingConnectorTest.java | 566 +++++++++++++++++----
.../connect/runtime/ErrorHandlingTaskTest.java | 6 -
.../ErrorHandlingTaskWithTopicCreationTest.java | 6 -
4 files changed, 472 insertions(+), 150 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 1febd7f..57f6f98 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -102,9 +102,7 @@ class WorkerSourceTask extends WorkerTask {
private CountDownLatch stopRequestedLatch;
private Map<String, String> taskConfig;
- private boolean finishedStart = false;
- private boolean startedShutdownBeforeStartCompleted = false;
- private boolean stopped = false;
+ private boolean started = false;
public WorkerSourceTask(ConnectorTaskId id,
SourceTask task,
@@ -166,8 +164,12 @@ class WorkerSourceTask extends WorkerTask {
@Override
protected void close() {
- if (!shouldPause()) {
- tryStop();
+ if (started) {
+ try {
+ task.stop();
+ } catch (Throwable t) {
+ log.warn("Could not stop task", t);
+ }
}
if (producer != null) {
try {
@@ -206,39 +208,21 @@ class WorkerSourceTask extends WorkerTask {
public void stop() {
super.stop();
stopRequestedLatch.countDown();
- synchronized (this) {
- if (finishedStart)
- tryStop();
- else
- startedShutdownBeforeStartCompleted = true;
- }
- }
-
- private synchronized void tryStop() {
- if (!stopped) {
- try {
- task.stop();
- stopped = true;
- } catch (Throwable t) {
- log.warn("Could not stop task", t);
- }
- }
}
@Override
public void execute() {
try {
+ // If we try to start the task at all by invoking initialize, then
count this as
+ // "started" and expect a subsequent call to the task's stop()
method
+ // to properly clean up any resources allocated by its
initialize() or
+ // start() methods. If the task throws an exception during stop(),
+ // the worst thing that happens is another exception gets logged
for an already-
+ // failed task
+ started = true;
task.initialize(new WorkerSourceTaskContext(offsetReader, this,
configState));
task.start(taskConfig);
log.info("{} Source task finished initialization and start", this);
- synchronized (this) {
- if (startedShutdownBeforeStartCompleted) {
- tryStop();
- return;
- }
- finishedStart = true;
- }
-
while (!isStopping()) {
if (shouldPause()) {
onPause();
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
index 9a8e1fa..abc9a93 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
@@ -16,17 +16,26 @@
*/
package org.apache.kafka.connect.integration;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.sink.SinkTaskContext;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
@@ -36,16 +45,21 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static
org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.Assert.assertThrows;
@@ -67,6 +81,33 @@ public class BlockingConnectorTest {
private static final long RECORD_TRANSFER_DURATION_MS =
TimeUnit.SECONDS.toMillis(30);
private static final long REST_REQUEST_TIMEOUT =
Worker.CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS * 2;
+ private static final String CONNECTOR_INITIALIZE = "Connector::initialize";
+ private static final String CONNECTOR_INITIALIZE_WITH_TASK_CONFIGS =
"Connector::initializeWithTaskConfigs";
+ private static final String CONNECTOR_START = "Connector::start";
+ private static final String CONNECTOR_RECONFIGURE =
"Connector::reconfigure";
+ private static final String CONNECTOR_TASK_CLASS = "Connector::taskClass";
+ private static final String CONNECTOR_TASK_CONFIGS =
"Connector::taskConfigs";
+ private static final String CONNECTOR_STOP = "Connector::stop";
+ private static final String CONNECTOR_VALIDATE = "Connector::validate";
+ private static final String CONNECTOR_CONFIG = "Connector::config";
+ private static final String CONNECTOR_VERSION = "Connector::version";
+ private static final String TASK_START = "Task::start";
+ private static final String TASK_STOP = "Task::stop";
+ private static final String TASK_VERSION = "Task::version";
+ private static final String SINK_TASK_INITIALIZE = "SinkTask::initialize";
+ private static final String SINK_TASK_PUT = "SinkTask::put";
+ private static final String SINK_TASK_FLUSH = "SinkTask::flush";
+ private static final String SINK_TASK_PRE_COMMIT = "SinkTask::preCommit";
+ private static final String SINK_TASK_OPEN = "SinkTask::open";
+ private static final String SINK_TASK_ON_PARTITIONS_ASSIGNED =
"SinkTask::onPartitionsAssigned";
+ private static final String SINK_TASK_CLOSE = "SinkTask::close";
+ private static final String SINK_TASK_ON_PARTITIONS_REVOKED =
"SinkTask::onPartitionsRevoked";
+ private static final String SOURCE_TASK_INITIALIZE =
"SourceTask::initialize";
+ private static final String SOURCE_TASK_POLL = "SourceTask::poll";
+ private static final String SOURCE_TASK_COMMIT = "SourceTask::commit";
+ private static final String SOURCE_TASK_COMMIT_RECORD =
"SourceTask::commitRecord";
+ private static final String SOURCE_TASK_COMMIT_RECORD_WITH_METADATA =
"SourceTask::commitRecordWithMetadata";
+
private EmbeddedConnectCluster connect;
private ConnectorHandle normalConnectorHandle;
@@ -101,18 +142,18 @@ public class BlockingConnectorTest {
// stop all Connect, Kafka and Zk threads.
connect.stop();
ConnectorsResource.resetRequestTimeout();
- BlockingConnector.resetBlockLatch();
+ Block.resetBlockLatch();
}
@Test
public void testBlockInConnectorValidate() throws Exception {
log.info("Starting test testBlockInConnectorValidate");
- assertThrows(ConnectRestException.class, () ->
createConnectorWithBlock(ValidateBlockingConnector.class));
+ assertThrows(ConnectRestException.class, () ->
createConnectorWithBlock(ValidateBlockingConnector.class, CONNECTOR_VALIDATE));
// Will NOT assert that connector has failed, since the request should
fail before it's even created
// Connector should already be blocked so this should return
immediately, but check just to
// make sure that it actually did block
- BlockingConnector.waitForBlock();
+ Block.waitForBlock();
createNormalConnector();
verifyNormalConnector();
@@ -121,12 +162,12 @@ public class BlockingConnectorTest {
@Test
public void testBlockInConnectorConfig() throws Exception {
log.info("Starting test testBlockInConnectorConfig");
- assertThrows(ConnectRestException.class, () ->
createConnectorWithBlock(ConfigBlockingConnector.class));
+ assertThrows(ConnectRestException.class, () ->
createConnectorWithBlock(ConfigBlockingConnector.class, CONNECTOR_CONFIG));
// Will NOT assert that connector has failed, since the request should
fail before it's even created
// Connector should already be blocked so this should return
immediately, but check just to
// make sure that it actually did block
- BlockingConnector.waitForBlock();
+ Block.waitForBlock();
createNormalConnector();
verifyNormalConnector();
@@ -135,8 +176,8 @@ public class BlockingConnectorTest {
@Test
public void testBlockInConnectorInitialize() throws Exception {
log.info("Starting test testBlockInConnectorInitialize");
- createConnectorWithBlock(InitializeBlockingConnector.class);
- BlockingConnector.waitForBlock();
+ createConnectorWithBlock(InitializeBlockingConnector.class,
CONNECTOR_INITIALIZE);
+ Block.waitForBlock();
createNormalConnector();
verifyNormalConnector();
@@ -145,8 +186,8 @@ public class BlockingConnectorTest {
@Test
public void testBlockInConnectorStart() throws Exception {
log.info("Starting test testBlockInConnectorStart");
- createConnectorWithBlock(BlockingConnector.START);
- BlockingConnector.waitForBlock();
+ createConnectorWithBlock(BlockingConnector.class, CONNECTOR_START);
+ Block.waitForBlock();
createNormalConnector();
verifyNormalConnector();
@@ -155,10 +196,54 @@ public class BlockingConnectorTest {
@Test
public void testBlockInConnectorStop() throws Exception {
log.info("Starting test testBlockInConnectorStop");
- createConnectorWithBlock(BlockingConnector.STOP);
+ createConnectorWithBlock(BlockingConnector.class, CONNECTOR_STOP);
+ waitForConnectorStart(BLOCKING_CONNECTOR_NAME);
+ connect.deleteConnector(BLOCKING_CONNECTOR_NAME);
+ Block.waitForBlock();
+
+ createNormalConnector();
+ verifyNormalConnector();
+ }
+
+ @Test
+ public void testBlockInSourceTaskStart() throws Exception {
+ log.info("Starting test testBlockInSourceTaskStart");
+ createConnectorWithBlock(BlockingSourceConnector.class, TASK_START);
+ Block.waitForBlock();
+
+ createNormalConnector();
+ verifyNormalConnector();
+ }
+
+ @Test
+ public void testBlockInSourceTaskStop() throws Exception {
+ log.info("Starting test testBlockInSourceTaskStop");
+ createConnectorWithBlock(BlockingSourceConnector.class, TASK_STOP);
waitForConnectorStart(BLOCKING_CONNECTOR_NAME);
connect.deleteConnector(BLOCKING_CONNECTOR_NAME);
- BlockingConnector.waitForBlock();
+ Block.waitForBlock();
+
+ createNormalConnector();
+ verifyNormalConnector();
+ }
+
+ @Test
+ public void testBlockInSinkTaskStart() throws Exception {
+ log.info("Starting test testBlockInSinkTaskStart");
+ createConnectorWithBlock(BlockingSinkConnector.class, TASK_START);
+ Block.waitForBlock();
+
+ createNormalConnector();
+ verifyNormalConnector();
+ }
+
+ @Test
+ public void testBlockInSinkTaskStop() throws Exception {
+ log.info("Starting test testBlockInSinkTaskStop");
+ createConnectorWithBlock(BlockingSinkConnector.class, TASK_STOP);
+ waitForConnectorStart(BLOCKING_CONNECTOR_NAME);
+ connect.deleteConnector(BLOCKING_CONNECTOR_NAME);
+ Block.waitForBlock();
createNormalConnector();
verifyNormalConnector();
@@ -167,50 +252,41 @@ public class BlockingConnectorTest {
@Test
public void testWorkerRestartWithBlockInConnectorStart() throws Exception {
log.info("Starting test testWorkerRestartWithBlockInConnectorStart");
- createConnectorWithBlock(BlockingConnector.START);
+ createConnectorWithBlock(BlockingConnector.class, CONNECTOR_START);
// First instance of the connector should block on startup
- BlockingConnector.waitForBlock();
+ Block.waitForBlock();
createNormalConnector();
connect.removeWorker();
connect.addWorker();
// After stopping the only worker and restarting it, a new instance of
the blocking
// connector should be created and we can ensure that it blocks again
- BlockingConnector.waitForBlock();
+ Block.waitForBlock();
verifyNormalConnector();
}
@Test
public void testWorkerRestartWithBlockInConnectorStop() throws Exception {
log.info("Starting test testWorkerRestartWithBlockInConnectorStop");
- createConnectorWithBlock(BlockingConnector.STOP);
+ createConnectorWithBlock(BlockingConnector.class, CONNECTOR_STOP);
waitForConnectorStart(BLOCKING_CONNECTOR_NAME);
createNormalConnector();
waitForConnectorStart(NORMAL_CONNECTOR_NAME);
connect.removeWorker();
- BlockingConnector.waitForBlock();
+ Block.waitForBlock();
connect.addWorker();
waitForConnectorStart(BLOCKING_CONNECTOR_NAME);
verifyNormalConnector();
}
- private void createConnectorWithBlock(String block) {
- Map<String, String> props = baseBlockingConnectorProps();
- props.put(BlockingConnector.BLOCK_CONFIG, block);
- log.info("Creating connector with block during {}", block);
- try {
- connect.configureConnector(BLOCKING_CONNECTOR_NAME, props);
- } catch (RuntimeException e) {
- log.info("Failed to create connector", e);
- throw e;
- }
- }
-
- private void createConnectorWithBlock(Class<? extends BlockingConnector>
connectorClass) {
- Map<String, String> props = baseBlockingConnectorProps();
+ private void createConnectorWithBlock(Class<? extends Connector>
connectorClass, String block) {
+ Map<String, String> props = new HashMap<>();
props.put(CONNECTOR_CLASS_CONFIG, connectorClass.getName());
- log.info("Creating blocking connector of type {}",
connectorClass.getSimpleName());
+ props.put(TASKS_MAX_CONFIG, "1");
+ props.put(TOPICS_CONFIG, "t1"); // Required for sink connectors
+ props.put(Block.BLOCK_CONFIG, Objects.requireNonNull(block));
+ log.info("Creating blocking connector of type {} with block in {}",
connectorClass.getSimpleName(), block);
try {
connect.configureConnector(BLOCKING_CONNECTOR_NAME, props);
} catch (RuntimeException e) {
@@ -219,13 +295,6 @@ public class BlockingConnectorTest {
}
}
- private Map<String, String> baseBlockingConnectorProps() {
- Map<String, String> result = new HashMap<>();
- result.put(CONNECTOR_CLASS_CONFIG, BlockingConnector.class.getName());
- result.put(TASKS_MAX_CONFIG, "1");
- return result;
- }
-
private void createNormalConnector() {
connect.kafka().createTopic(TEST_TOPIC, 3);
@@ -263,63 +332,43 @@ public class BlockingConnectorTest {
normalConnectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS);
}
- public static class BlockingConnector extends SourceConnector {
-
+ private static class Block {
private static CountDownLatch blockLatch;
- private String block;
+ private final String block;
public static final String BLOCK_CONFIG = "block";
- public static final String INITIALIZE = "initialize";
- public static final String INITIALIZE_WITH_TASK_CONFIGS =
"initializeWithTaskConfigs";
- public static final String START = "start";
- public static final String RECONFIGURE = "reconfigure";
- public static final String TASK_CLASS = "taskClass";
- public static final String TASK_CONFIGS = "taskConfigs";
- public static final String STOP = "stop";
- public static final String VALIDATE = "validate";
- public static final String CONFIG = "config";
- public static final String VERSION = "version";
-
- private static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(
- BLOCK_CONFIG,
- ConfigDef.Type.STRING,
- "",
- ConfigDef.Importance.MEDIUM,
- "Where to block indefinitely, e.g., 'start', 'initialize',
'taskConfigs', 'version'"
- );
-
- // No-args constructor required by the framework
- public BlockingConnector() {
- this(null);
- }
-
- protected BlockingConnector(String block) {
- this.block = block;
- synchronized (BlockingConnector.class) {
- if (blockLatch != null) {
- blockLatch.countDown();
- }
- blockLatch = new CountDownLatch(1);
- }
+ private static ConfigDef config() {
+ return new ConfigDef()
+ .define(
+ BLOCK_CONFIG,
+ ConfigDef.Type.STRING,
+ "",
+ ConfigDef.Importance.MEDIUM,
+ "Where to block indefinitely, e.g., 'Connector::start',
'Connector::initialize', "
+ + "'Connector::taskConfigs', 'Task::version',
'SinkTask::put', 'SourceTask::poll'"
+ );
}
public static void waitForBlock() throws InterruptedException {
- synchronized (BlockingConnector.class) {
+ synchronized (Block.class) {
if (blockLatch == null) {
throw new IllegalArgumentException("No connector has been
created yet");
}
}
-
+
log.debug("Waiting for connector to block");
blockLatch.await();
log.debug("Connector should now be blocked");
}
+ // Note that there is only ever at most one global block latch at a
time, which makes tests that
+ // use blocks in multiple places impossible. If necessary, this can be
addressed in the future by
+ // adding support for multiple block latches at a time, possibly
identifiable by a connector/task
+ // ID, the location of the expected block, or both.
public static void resetBlockLatch() {
- synchronized (BlockingConnector.class) {
+ synchronized (Block.class) {
if (blockLatch != null) {
blockLatch.countDown();
blockLatch = null;
@@ -327,81 +376,114 @@ public class BlockingConnectorTest {
}
}
+ public Block(Map<String, String> props) {
+ this(new AbstractConfig(config(), props).getString(BLOCK_CONFIG));
+ }
+
+ public Block(String block) {
+ this.block = block;
+ synchronized (Block.class) {
+ if (blockLatch != null) {
+ blockLatch.countDown();
+ }
+ blockLatch = new CountDownLatch(1);
+ }
+ }
+
+ public Map<String, String> taskConfig() {
+ return Collections.singletonMap(BLOCK_CONFIG, block);
+ }
+
+ public void maybeBlockOn(String block) {
+ if (block.equals(this.block)) {
+ log.info("Will block on {}", block);
+ blockLatch.countDown();
+ while (true) {
+ try {
+ Thread.sleep(Long.MAX_VALUE);
+ } catch (InterruptedException e) {
+ // No-op. Just keep blocking.
+ }
+ }
+ } else {
+ log.debug("Will not block on {}", block);
+ }
+ }
+ }
+
+ // Used to test blocks in Connector (as opposed to Task) methods
+ public static class BlockingConnector extends SourceConnector {
+
+ private Block block;
+
+ // No-args constructor required by the framework
+ public BlockingConnector() {
+ this(null);
+ }
+
+ protected BlockingConnector(String block) {
+ this.block = new Block(block);
+ }
+
@Override
public void initialize(ConnectorContext ctx) {
- maybeBlockOn(INITIALIZE);
+ block.maybeBlockOn(CONNECTOR_INITIALIZE);
super.initialize(ctx);
}
@Override
public void initialize(ConnectorContext ctx, List<Map<String, String>>
taskConfigs) {
- maybeBlockOn(INITIALIZE_WITH_TASK_CONFIGS);
+ block.maybeBlockOn(CONNECTOR_INITIALIZE_WITH_TASK_CONFIGS);
super.initialize(ctx, taskConfigs);
}
@Override
public void start(Map<String, String> props) {
- this.block = new AbstractConfig(CONFIG_DEF,
props).getString(BLOCK_CONFIG);
- maybeBlockOn(START);
+ this.block = new Block(props);
+ block.maybeBlockOn(CONNECTOR_START);
}
@Override
public void reconfigure(Map<String, String> props) {
+ block.maybeBlockOn(CONNECTOR_RECONFIGURE);
super.reconfigure(props);
- maybeBlockOn(RECONFIGURE);
}
@Override
public Class<? extends Task> taskClass() {
- maybeBlockOn(TASK_CLASS);
+ block.maybeBlockOn(CONNECTOR_TASK_CLASS);
return BlockingTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
- maybeBlockOn(TASK_CONFIGS);
+ block.maybeBlockOn(CONNECTOR_TASK_CONFIGS);
return Collections.singletonList(Collections.emptyMap());
}
@Override
public void stop() {
- maybeBlockOn(STOP);
+ block.maybeBlockOn(CONNECTOR_STOP);
}
@Override
public Config validate(Map<String, String> connectorConfigs) {
- maybeBlockOn(VALIDATE);
+ block.maybeBlockOn(CONNECTOR_VALIDATE);
return super.validate(connectorConfigs);
}
@Override
public ConfigDef config() {
- maybeBlockOn(CONFIG);
- return CONFIG_DEF;
+ block.maybeBlockOn(CONNECTOR_CONFIG);
+ return Block.config();
}
@Override
public String version() {
- maybeBlockOn(VERSION);
+ block.maybeBlockOn(CONNECTOR_VERSION);
return "0.0.0";
}
- protected void maybeBlockOn(String block) {
- if (block.equals(this.block)) {
- log.info("Will block on {}", block);
- blockLatch.countDown();
- while (true) {
- try {
- Thread.sleep(Long.MAX_VALUE);
- } catch (InterruptedException e) {
- // No-op. Just keep blocking.
- }
- }
- } else {
- log.debug("Will not block on {}", block);
- }
- }
-
public static class BlockingTask extends SourceTask {
@Override
public void start(Map<String, String> props) {
@@ -426,19 +508,287 @@ public class BlockingConnectorTest {
// Some methods are called before Connector::start, so we use this as a
workaround
public static class InitializeBlockingConnector extends BlockingConnector {
public InitializeBlockingConnector() {
- super(INITIALIZE);
+ super(CONNECTOR_INITIALIZE);
}
}
public static class ConfigBlockingConnector extends BlockingConnector {
public ConfigBlockingConnector() {
- super(CONFIG);
+ super(CONNECTOR_CONFIG);
}
}
public static class ValidateBlockingConnector extends BlockingConnector {
public ValidateBlockingConnector() {
- super(VALIDATE);
+ super(CONNECTOR_VALIDATE);
+ }
+ }
+
+ // Used to test blocks in SourceTask methods
+ public static class BlockingSourceConnector extends SourceConnector {
+
+ private Map<String, String> props;
+ private final Class<? extends BlockingSourceTask> taskClass;
+
+ // No-args constructor required by the framework
+ public BlockingSourceConnector() {
+ this(BlockingSourceTask.class);
+ }
+
+ protected BlockingSourceConnector(Class<? extends BlockingSourceTask>
taskClass) {
+ this.taskClass = taskClass;
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+ this.props = props;
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return taskClass;
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ return IntStream.range(0, maxTasks)
+ .mapToObj(i -> new HashMap<>(props))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ @Override
+ public Config validate(Map<String, String> connectorConfigs) {
+ return super.validate(connectorConfigs);
+ }
+
+ @Override
+ public ConfigDef config() {
+ return Block.config();
+ }
+
+ @Override
+ public String version() {
+ return "0.0.0";
+ }
+
+ public static class BlockingSourceTask extends SourceTask {
+ private Block block;
+
+ // No-args constructor required by the framework
+ public BlockingSourceTask() {
+ this(null);
+ }
+
+ protected BlockingSourceTask(String block) {
+ this.block = new Block(block);
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+ this.block = new Block(props);
+ block.maybeBlockOn(TASK_START);
+ }
+
+ @Override
+ public List<SourceRecord> poll() {
+ block.maybeBlockOn(SOURCE_TASK_POLL);
+ return null;
+ }
+
+ @Override
+ public void stop() {
+ block.maybeBlockOn(TASK_STOP);
+ }
+
+ @Override
+ public String version() {
+ block.maybeBlockOn(TASK_VERSION);
+ return "0.0.0";
+ }
+
+ @Override
+ public void initialize(SourceTaskContext context) {
+ block.maybeBlockOn(SOURCE_TASK_INITIALIZE);
+ super.initialize(context);
+ }
+
+ @Override
+ public void commit() throws InterruptedException {
+ block.maybeBlockOn(SOURCE_TASK_COMMIT);
+ super.commit();
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public void commitRecord(SourceRecord record) throws
InterruptedException {
+ block.maybeBlockOn(SOURCE_TASK_COMMIT_RECORD);
+ super.commitRecord(record);
+ }
+
+ @Override
+ public void commitRecord(SourceRecord record, RecordMetadata
metadata) throws InterruptedException {
+ block.maybeBlockOn(SOURCE_TASK_COMMIT_RECORD_WITH_METADATA);
+ super.commitRecord(record, metadata);
+ }
+ }
+ }
+
+ public static class TaskInitializeBlockingSourceConnector extends
BlockingSourceConnector {
+ public TaskInitializeBlockingSourceConnector() {
+ super(InitializeBlockingSourceTask.class);
+ }
+
+ public static class InitializeBlockingSourceTask extends
BlockingSourceTask {
+ public InitializeBlockingSourceTask() {
+ super(SOURCE_TASK_INITIALIZE);
+ }
+ }
+ }
+
+ // Used to test blocks in SinkTask methods
+ public static class BlockingSinkConnector extends SinkConnector {
+
+ private Map<String, String> props;
+ private final Class<? extends BlockingSinkTask> taskClass;
+
+ // No-args constructor required by the framework
+ public BlockingSinkConnector() {
+ this(BlockingSinkTask.class);
+ }
+
+ protected BlockingSinkConnector(Class<? extends BlockingSinkTask>
taskClass) {
+ this.taskClass = taskClass;
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+ this.props = props;
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return taskClass;
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ return IntStream.rangeClosed(0, maxTasks)
+ .mapToObj(i -> new HashMap<>(props))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ @Override
+ public Config validate(Map<String, String> connectorConfigs) {
+ return super.validate(connectorConfigs);
+ }
+
+ @Override
+ public ConfigDef config() {
+ return Block.config();
+ }
+
+ @Override
+ public String version() {
+ return "0.0.0";
+ }
+
+ public static class BlockingSinkTask extends SinkTask {
+ private Block block;
+
+ // No-args constructor required by the framework
+ public BlockingSinkTask() {
+ this(null);
+ }
+
+ protected BlockingSinkTask(String block) {
+ this.block = new Block(block);
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+ this.block = new Block(props);
+ block.maybeBlockOn(TASK_START);
+ }
+
+ @Override
+ public void put(Collection<SinkRecord> records) {
+ block.maybeBlockOn(SINK_TASK_PUT);
+ }
+
+ @Override
+ public void stop() {
+ block.maybeBlockOn(TASK_STOP);
+ }
+
+ @Override
+ public String version() {
+ block.maybeBlockOn(TASK_VERSION);
+ return "0.0.0";
+ }
+
+ @Override
+ public void initialize(SinkTaskContext context) {
+ block.maybeBlockOn(SINK_TASK_INITIALIZE);
+ super.initialize(context);
+ }
+
+ @Override
+ public void flush(Map<TopicPartition, OffsetAndMetadata>
currentOffsets) {
+ block.maybeBlockOn(SINK_TASK_FLUSH);
+ super.flush(currentOffsets);
+ }
+
+ @Override
+ public Map<TopicPartition, OffsetAndMetadata>
preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
+ block.maybeBlockOn(SINK_TASK_PRE_COMMIT);
+ return super.preCommit(currentOffsets);
+ }
+
+ @Override
+ public void open(Collection<TopicPartition> partitions) {
+ block.maybeBlockOn(SINK_TASK_OPEN);
+ super.open(partitions);
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
+ block.maybeBlockOn(SINK_TASK_ON_PARTITIONS_ASSIGNED);
+ super.onPartitionsAssigned(partitions);
+ }
+
+ @Override
+ public void close(Collection<TopicPartition> partitions) {
+ block.maybeBlockOn(SINK_TASK_CLOSE);
+ super.close(partitions);
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
+ block.maybeBlockOn(SINK_TASK_ON_PARTITIONS_REVOKED);
+ super.onPartitionsRevoked(partitions);
+ }
+ }
+ }
+
+ public static class TaskInitializeBlockingSinkConnector extends
BlockingSinkConnector {
+ public TaskInitializeBlockingSinkConnector() {
+ super(InitializeBlockingSinkTask.class);
+ }
+
+ public static class InitializeBlockingSinkTask extends
BlockingSinkTask {
+ public InitializeBlockingSinkTask() {
+ super(SINK_TASK_INITIALIZE);
+ }
}
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index c471b03..70bbfc6 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -236,9 +236,6 @@ public class ErrorHandlingTaskTest {
createSourceTask(initialState, retryWithToleranceOperator);
- sourceTask.stop();
- PowerMock.expectLastCall();
-
expectClose();
reporter.close();
@@ -263,9 +260,6 @@ public class ErrorHandlingTaskTest {
createSourceTask(initialState, retryWithToleranceOperator);
- sourceTask.stop();
- PowerMock.expectLastCall();
-
expectClose();
// Even though the reporters throw exceptions, they should both still
be closed.
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
index aba6445..9c115ac 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
@@ -250,9 +250,6 @@ public class ErrorHandlingTaskWithTopicCreationTest {
createSourceTask(initialState, retryWithToleranceOperator);
- sourceTask.stop();
- PowerMock.expectLastCall();
-
expectClose();
reporter.close();
@@ -277,9 +274,6 @@ public class ErrorHandlingTaskWithTopicCreationTest {
createSourceTask(initialState, retryWithToleranceOperator);
- sourceTask.stop();
- PowerMock.expectLastCall();
-
expectClose();
// Even though the reporters throw exceptions, they should both still
be closed.