[jira] [Commented] (KAFKA-5896) Kafka Connect task threads never interrupted
[ https://issues.apache.org/jira/browse/KAFKA-5896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177973#comment-17177973 ] Ewen Cheslack-Postava commented on KAFKA-5896: -- [~yazgoo] It looks like this was marked "Abandoned". Reiterating my previous comment: > Really my complaint is that Java's interrupt semantics are terrible and try >to give the feeling of preemption when it can't and therefore leaves a ton of >bugs and overpromised underdelivered guarantees in its wake. It's fine to reopen if you think it needs more discussion, I just don't see a way to actually fix the issue – Thread.interrupt doesn't do what we'd want and afaik the jvm doesn't provide anything that does. So I think given those constraints, it's probably better to identify the connector that is behaving badly and work with upstream to address it. > Kafka Connect task threads never interrupted > > > Key: KAFKA-5896 > URL: https://issues.apache.org/jira/browse/KAFKA-5896 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Nick Pillitteri >Assignee: Nick Pillitteri >Priority: Minor > > h2. Problem > Kafka Connect tasks associated with connectors are run in their own threads. > When tasks are stopped or restarted, a flag is set - {{stopping}} - to > indicate the task should stop processing records. However, if the thread the > task is running in is blocked (waiting for a lock or performing I/O) it's > possible the task will never stop. > I've created a connector specifically to demonstrate this issue (along with > some more detailed instructions for reproducing the issue): > https://github.com/smarter-travel-media/hang-connector > I believe this is an issue because it means that a single badly behaved > connector (any connector that does I/O without timeouts) can cause the Kafka > Connect worker to get into a state where the only solution is to restart the > JVM. > I think, but couldn't reproduce, that this is the cause of this problem on > Stack Overflow: > https://stackoverflow.com/questions/43802156/inconsistent-connector-state-connectexception-task-already-exists-in-this-work > h2. Expected Result > I would expect the Worker to eventually interrupt the thread that the task is > running in. In the past across various other libraries, this is what I've > seen done when a thread needs to be forcibly stopped. > h2. Actual Result > In actuality, the Worker sets a {{stopping}} flag and lets the thread run > indefinitely. It uses a timeout while waiting for the task to stop but after > this timeout has expired it simply sets a {{cancelled}} flag. This means that > every time a task is restarted, a new thread running the task will be > created. Thus a task may end up with multiple instances all running in their > own threads when there's only supposed to be a single thread. > h2. Steps to Reproduce > The problem can be replicated by using the connector available here: > https://github.com/smarter-travel-media/hang-connector > Apologies for how involved the steps are. > I've created a patch that forcibly interrupts threads after they fail to > gracefully shutdown here: > https://github.com/smarter-travel-media/kafka/commit/295c747a9fd82ee8b30556c89c31e0bfcce5a2c5 > I've confirmed that this fixes the issue. I can add some unit tests and > submit a PR if people agree that this is a bug and interrupting threads is > the right fix. > Thanks! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-5896) Kafka Connect task threads never interrupted
[ https://issues.apache.org/jira/browse/KAFKA-5896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177821#comment-17177821 ] yazgoo commented on KAFKA-5896: --- I still encountered this exception in CP 2.5, shouldn't this be re-opened ? > Kafka Connect task threads never interrupted > > > Key: KAFKA-5896 > URL: https://issues.apache.org/jira/browse/KAFKA-5896 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Nick Pillitteri >Assignee: Nick Pillitteri >Priority: Minor > > h2. Problem > Kafka Connect tasks associated with connectors are run in their own threads. > When tasks are stopped or restarted, a flag is set - {{stopping}} - to > indicate the task should stop processing records. However, if the thread the > task is running in is blocked (waiting for a lock or performing I/O) it's > possible the task will never stop. > I've created a connector specifically to demonstrate this issue (along with > some more detailed instructions for reproducing the issue): > https://github.com/smarter-travel-media/hang-connector > I believe this is an issue because it means that a single badly behaved > connector (any connector that does I/O without timeouts) can cause the Kafka > Connect worker to get into a state where the only solution is to restart the > JVM. > I think, but couldn't reproduce, that this is the cause of this problem on > Stack Overflow: > https://stackoverflow.com/questions/43802156/inconsistent-connector-state-connectexception-task-already-exists-in-this-work > h2. Expected Result > I would expect the Worker to eventually interrupt the thread that the task is > running in. In the past across various other libraries, this is what I've > seen done when a thread needs to be forcibly stopped. > h2. Actual Result > In actuality, the Worker sets a {{stopping}} flag and lets the thread run > indefinitely. It uses a timeout while waiting for the task to stop but after > this timeout has expired it simply sets a {{cancelled}} flag. This means that > every time a task is restarted, a new thread running the task will be > created. Thus a task may end up with multiple instances all running in their > own threads when there's only supposed to be a single thread. > h2. Steps to Reproduce > The problem can be replicated by using the connector available here: > https://github.com/smarter-travel-media/hang-connector > Apologies for how involved the steps are. > I've created a patch that forcibly interrupts threads after they fail to > gracefully shutdown here: > https://github.com/smarter-travel-media/kafka/commit/295c747a9fd82ee8b30556c89c31e0bfcce5a2c5 > I've confirmed that this fixes the issue. I can add some unit tests and > submit a PR if people agree that this is a bug and interrupting threads is > the right fix. > Thanks! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-5896) Kafka Connect task threads never interrupted
[ https://issues.apache.org/jira/browse/KAFKA-5896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16459799#comment-16459799 ] ASF GitHub Bot commented on KAFKA-5896: --- 56quarters closed pull request #3876: KAFKA-5896: Force Connect tasks to stop via thread interruption URL: https://github.com/apache/kafka/pull/3876 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index c6e2e173834..b9f96128f6e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -55,6 +55,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; /** @@ -83,6 +84,9 @@ private final ConcurrentMap connectors = new ConcurrentHashMap<>(); private final ConcurrentMap tasks = new ConcurrentHashMap<>(); +private final ConcurrentMap> futures = new ConcurrentHashMap<>(); +private final Object taskAndFutureLock = new Object(); + private SourceTaskOffsetCommitter sourceTaskOffsetCommitter; public Worker( @@ -414,11 +418,14 @@ public boolean startTask( return false; } -WorkerTask existing = tasks.putIfAbsent(id, workerTask); -if (existing != null) -throw new ConnectException("Task already exists in this worker: " + id); +synchronized (taskAndFutureLock) { +WorkerTask existing = tasks.putIfAbsent(id, workerTask); +if (existing != null) +throw new ConnectException("Task already exists in this worker: " + id); + +futures.put(id, executor.submit(workerTask)); +} -executor.submit(workerTask); if (workerTask instanceof WorkerSourceTask) { sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) workerTask); } @@ -483,18 +490,41 @@ private void stopTasks(Collection ids) { } private void awaitStopTask(ConnectorTaskId taskId, long timeout) { -WorkerTask task = tasks.remove(taskId); +WorkerTask task; +Future future; + +synchronized (taskAndFutureLock) { +task = tasks.remove(taskId); +future = futures.remove(taskId); +} + if (task == null) { log.warn("Ignoring await stop request for non-present task {}", taskId); return; } if (!task.awaitStop(timeout)) { -log.error("Graceful stop of task {} failed.", task.id()); +log.error("Graceful stop of task {} failed. Cancelling and forcibly interrupting.", task.id()); task.cancel(); + +if (future == null) { +log.warn("No associated Future found for task {}", taskId); +return; +} + +// Interrupt the thread that the task is running in since it hasn't stopped on its +// own by this point. This prevents scenarios where a task runs indefinitely because +// it's blocked on something (lock, network I/O, etc.). +future.cancel(true); } } +// Visible for testing +boolean isTaskFutureRunning(ConnectorTaskId taskId) { +final Future future = futures.get(taskId); +return future != null && !future.isDone(); +} + private void awaitStopTasks(Collection ids) { long now = time.milliseconds(); long deadline = now + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 80c65df7ff4..361cec77a94 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -34,6 +34,8 @@ import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; @@ -57,14 +59,18 @@ import org.powermock.modules.junit4.PowerMockRunner;
[jira] [Commented] (KAFKA-5896) Kafka Connect task threads never interrupted
[ https://issues.apache.org/jira/browse/KAFKA-5896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371896#comment-16371896 ] Ewen Cheslack-Postava commented on KAFKA-5896: -- [~rhauch] I don't have a strong opinion. I don't think it will really solve the problem since a task not respecting a request to stop probably indicates a buggy implementation and it could very easily be doing some blocking operation that won't respond to the interrupt request either (or the implementer might catch and ignore InterruptedException in their code because they are forced to handle it and don't know what to do with it, or probably other buggy implementation issues that would cause this to not work). But it doesn't much matter if this is included. Really my complaint is that Java's interrupt semantics are terrible and try to give the feeling of preemption when it can't and therefore leaves a ton of bugs and overpromised underdelivered guarantees in its wake. > Kafka Connect task threads never interrupted > > > Key: KAFKA-5896 > URL: https://issues.apache.org/jira/browse/KAFKA-5896 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Nick Pillitteri >Assignee: Nick Pillitteri >Priority: Minor > > h2. Problem > Kafka Connect tasks associated with connectors are run in their own threads. > When tasks are stopped or restarted, a flag is set - {{stopping}} - to > indicate the task should stop processing records. However, if the thread the > task is running in is blocked (waiting for a lock or performing I/O) it's > possible the task will never stop. > I've created a connector specifically to demonstrate this issue (along with > some more detailed instructions for reproducing the issue): > https://github.com/smarter-travel-media/hang-connector > I believe this is an issue because it means that a single badly behaved > connector (any connector that does I/O without timeouts) can cause the Kafka > Connect worker to get into a state where the only solution is to restart the > JVM. > I think, but couldn't reproduce, that this is the cause of this problem on > Stack Overflow: > https://stackoverflow.com/questions/43802156/inconsistent-connector-state-connectexception-task-already-exists-in-this-work > h2. Expected Result > I would expect the Worker to eventually interrupt the thread that the task is > running in. In the past across various other libraries, this is what I've > seen done when a thread needs to be forcibly stopped. > h2. Actual Result > In actuality, the Worker sets a {{stopping}} flag and lets the thread run > indefinitely. It uses a timeout while waiting for the task to stop but after > this timeout has expired it simply sets a {{cancelled}} flag. This means that > every time a task is restarted, a new thread running the task will be > created. Thus a task may end up with multiple instances all running in their > own threads when there's only supposed to be a single thread. > h2. Steps to Reproduce > The problem can be replicated by using the connector available here: > https://github.com/smarter-travel-media/hang-connector > Apologies for how involved the steps are. > I've created a patch that forcibly interrupts threads after they fail to > gracefully shutdown here: > https://github.com/smarter-travel-media/kafka/commit/295c747a9fd82ee8b30556c89c31e0bfcce5a2c5 > I've confirmed that this fixes the issue. I can add some unit tests and > submit a PR if people agree that this is a bug and interrupting threads is > the right fix. > Thanks! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5896) Kafka Connect task threads never interrupted
[ https://issues.apache.org/jira/browse/KAFKA-5896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358670#comment-16358670 ] Randall Hauch commented on KAFKA-5896: -- [~ewencp], do you have any thoughts on this? I know in the past you've talked about not wanting to do this since some developers won't properly implement the interruption. I agree that not everyone implements it correctly, but we could at least _try_ to cancel the tasks. > Kafka Connect task threads never interrupted > > > Key: KAFKA-5896 > URL: https://issues.apache.org/jira/browse/KAFKA-5896 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Nick Pillitteri >Assignee: Nick Pillitteri >Priority: Minor > > h2. Problem > Kafka Connect tasks associated with connectors are run in their own threads. > When tasks are stopped or restarted, a flag is set - {{stopping}} - to > indicate the task should stop processing records. However, if the thread the > task is running in is blocked (waiting for a lock or performing I/O) it's > possible the task will never stop. > I've created a connector specifically to demonstrate this issue (along with > some more detailed instructions for reproducing the issue): > https://github.com/smarter-travel-media/hang-connector > I believe this is an issue because it means that a single badly behaved > connector (any connector that does I/O without timeouts) can cause the Kafka > Connect worker to get into a state where the only solution is to restart the > JVM. > I think, but couldn't reproduce, that this is the cause of this problem on > Stack Overflow: > https://stackoverflow.com/questions/43802156/inconsistent-connector-state-connectexception-task-already-exists-in-this-work > h2. Expected Result > I would expect the Worker to eventually interrupt the thread that the task is > running in. In the past across various other libraries, this is what I've > seen done when a thread needs to be forcibly stopped. > h2. Actual Result > In actuality, the Worker sets a {{stopping}} flag and lets the thread run > indefinitely. It uses a timeout while waiting for the task to stop but after > this timeout has expired it simply sets a {{cancelled}} flag. This means that > every time a task is restarted, a new thread running the task will be > created. Thus a task may end up with multiple instances all running in their > own threads when there's only supposed to be a single thread. > h2. Steps to Reproduce > The problem can be replicated by using the connector available here: > https://github.com/smarter-travel-media/hang-connector > Apologies for how involved the steps are. > I've created a patch that forcibly interrupts threads after they fail to > gracefully shutdown here: > https://github.com/smarter-travel-media/kafka/commit/295c747a9fd82ee8b30556c89c31e0bfcce5a2c5 > I've confirmed that this fixes the issue. I can add some unit tests and > submit a PR if people agree that this is a bug and interrupting threads is > the right fix. > Thanks! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5896) Kafka Connect task threads never interrupted
[ https://issues.apache.org/jira/browse/KAFKA-5896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16226849#comment-16226849 ] Nick Pillitteri commented on KAFKA-5896: I believe this PR is ready to merge if people are OK with the solution of interrupting threads that do not stop in time. > Kafka Connect task threads never interrupted > > > Key: KAFKA-5896 > URL: https://issues.apache.org/jira/browse/KAFKA-5896 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Nick Pillitteri >Assignee: Nick Pillitteri >Priority: Minor > > h2. Problem > Kafka Connect tasks associated with connectors are run in their own threads. > When tasks are stopped or restarted, a flag is set - {{stopping}} - to > indicate the task should stop processing records. However, if the thread the > task is running in is blocked (waiting for a lock or performing I/O) it's > possible the task will never stop. > I've created a connector specifically to demonstrate this issue (along with > some more detailed instructions for reproducing the issue): > https://github.com/smarter-travel-media/hang-connector > I believe this is an issue because it means that a single badly behaved > connector (any connector that does I/O without timeouts) can cause the Kafka > Connect worker to get into a state where the only solution is to restart the > JVM. > I think, but couldn't reproduce, that this is the cause of this problem on > Stack Overflow: > https://stackoverflow.com/questions/43802156/inconsistent-connector-state-connectexception-task-already-exists-in-this-work > h2. Expected Result > I would expect the Worker to eventually interrupt the thread that the task is > running in. In the past across various other libraries, this is what I've > seen done when a thread needs to be forcibly stopped. > h2. Actual Result > In actuality, the Worker sets a {{stopping}} flag and lets the thread run > indefinitely. It uses a timeout while waiting for the task to stop but after > this timeout has expired it simply sets a {{cancelled}} flag. This means that > every time a task is restarted, a new thread running the task will be > created. Thus a task may end up with multiple instances all running in their > own threads when there's only supposed to be a single thread. > h2. Steps to Reproduce > The problem can be replicated by using the connector available here: > https://github.com/smarter-travel-media/hang-connector > Apologies for how involved the steps are. > I've created a patch that forcibly interrupts threads after they fail to > gracefully shutdown here: > https://github.com/smarter-travel-media/kafka/commit/295c747a9fd82ee8b30556c89c31e0bfcce5a2c5 > I've confirmed that this fixes the issue. I can add some unit tests and > submit a PR if people agree that this is a bug and interrupting threads is > the right fix. > Thanks! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5896) Kafka Connect task threads never interrupted
[ https://issues.apache.org/jira/browse/KAFKA-5896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16188495#comment-16188495 ] Nick Pillitteri commented on KAFKA-5896: I've addressed the issues pointed out in the PR. Is there anything else people would like to see done here? Thanks. > Kafka Connect task threads never interrupted > > > Key: KAFKA-5896 > URL: https://issues.apache.org/jira/browse/KAFKA-5896 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Nick Pillitteri >Assignee: Nick Pillitteri >Priority: Minor > > h2. Problem > Kafka Connect tasks associated with connectors are run in their own threads. > When tasks are stopped or restarted, a flag is set - {{stopping}} - to > indicate the task should stop processing records. However, if the thread the > task is running in is blocked (waiting for a lock or performing I/O) it's > possible the task will never stop. > I've created a connector specifically to demonstrate this issue (along with > some more detailed instructions for reproducing the issue): > https://github.com/smarter-travel-media/hang-connector > I believe this is an issue because it means that a single badly behaved > connector (any connector that does I/O without timeouts) can cause the Kafka > Connect worker to get into a state where the only solution is to restart the > JVM. > I think, but couldn't reproduce, that this is the cause of this problem on > Stack Overflow: > https://stackoverflow.com/questions/43802156/inconsistent-connector-state-connectexception-task-already-exists-in-this-work > h2. Expected Result > I would expect the Worker to eventually interrupt the thread that the task is > running in. In the past across various other libraries, this is what I've > seen done when a thread needs to be forcibly stopped. > h2. Actual Result > In actuality, the Worker sets a {{stopping}} flag and lets the thread run > indefinitely. It uses a timeout while waiting for the task to stop but after > this timeout has expired it simply sets a {{cancelled}} flag. This means that > every time a task is restarted, a new thread running the task will be > created. Thus a task may end up with multiple instances all running in their > own threads when there's only supposed to be a single thread. > h2. Steps to Reproduce > The problem can be replicated by using the connector available here: > https://github.com/smarter-travel-media/hang-connector > Apologies for how involved the steps are. > I've created a patch that forcibly interrupts threads after they fail to > gracefully shutdown here: > https://github.com/smarter-travel-media/kafka/commit/295c747a9fd82ee8b30556c89c31e0bfcce5a2c5 > I've confirmed that this fixes the issue. I can add some unit tests and > submit a PR if people agree that this is a bug and interrupting threads is > the right fix. > Thanks! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5896) Kafka Connect task threads never interrupted
[ https://issues.apache.org/jira/browse/KAFKA-5896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169380#comment-16169380 ] ASF GitHub Bot commented on KAFKA-5896: --- Github user 56quarters closed the pull request at: https://github.com/apache/kafka/pull/3876 > Kafka Connect task threads never interrupted > > > Key: KAFKA-5896 > URL: https://issues.apache.org/jira/browse/KAFKA-5896 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Nick Pillitteri >Priority: Minor > > h2. Problem > Kafka Connect tasks associated with connectors are run in their own threads. > When tasks are stopped or restarted, a flag is set - {{stopping}} - to > indicate the task should stop processing records. However, if the thread the > task is running in is blocked (waiting for a lock or performing I/O) it's > possible the task will never stop. > I've created a connector specifically to demonstrate this issue (along with > some more detailed instructions for reproducing the issue): > https://github.com/smarter-travel-media/hang-connector > I believe this is an issue because it means that a single badly behaved > connector (any connector that does I/O without timeouts) can cause the Kafka > Connect worker to get into a state where the only solution is to restart the > JVM. > I think, but couldn't reproduce, that this is the cause of this problem on > Stack Overflow: > https://stackoverflow.com/questions/43802156/inconsistent-connector-state-connectexception-task-already-exists-in-this-work > h2. Expected Result > I would expect the Worker to eventually interrupt the thread that the task is > running in. In the past across various other libraries, this is what I've > seen done when a thread needs to be forcibly stopped. > h2. Actual Result > In actuality, the Worker sets a {{stopping}} flag and lets the thread run > indefinitely. It uses a timeout while waiting for the task to stop but after > this timeout has expired it simply sets a {{cancelled}} flag. This means that > every time a task is restarted, a new thread running the task will be > created. Thus a task may end up with multiple instances all running in their > own threads when there's only supposed to be a single thread. > h2. Steps to Reproduce > The problem can be replicated by using the connector available here: > https://github.com/smarter-travel-media/hang-connector > Apologies for how involved the steps are. > I've created a patch that forcibly interrupts threads after they fail to > gracefully shutdown here: > https://github.com/smarter-travel-media/kafka/commit/295c747a9fd82ee8b30556c89c31e0bfcce5a2c5 > I've confirmed that this fixes the issue. I can add some unit tests and > submit a PR if people agree that this is a bug and interrupting threads is > the right fix. > Thanks! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5896) Kafka Connect task threads never interrupted
[ https://issues.apache.org/jira/browse/KAFKA-5896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169381#comment-16169381 ] ASF GitHub Bot commented on KAFKA-5896: --- GitHub user 56quarters reopened a pull request: https://github.com/apache/kafka/pull/3876 KAFKA-5896: Force Connect tasks to stop via thread interruption Interrupt the thread of Kafka Connect tasks that do not stop within the timeout via `Worker::stopAndAwaitTasks()`. Previously tasks would be asked to stop via setting a `stopping` flag. It was possible for tasks to ignore this flag if they were, for example, waiting for a lock or blocked on I/O. This prevents issues where tasks may end up with multiple threads all running and attempting to make progress when there should only be a single thread running for that task at a time. Fixes KAFKA-5896 /cc @rhauch @tedyu You can merge this pull request into a Git repository by running: $ git pull https://github.com/smarter-travel-media/kafka force-task-stop Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3876.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3876 commit 31c879c1a1f0bd4f5999c021baca8e99e733ffe1 Author: Nick Pillitteri Date: 2017-09-13T14:54:40Z Force Connect tasks to stop via thread interruption after a timeout Interrupt the thread of Kafka Connect tasks that do not stop within the timeout via Worker::stopAndAwaitTasks(). Previously tasks would be asked to stop via setting a `stopping` flag. It was possible for tasks to ignore this flag if they were, for example, waiting for a lock or blocked on I/O. This prevents issues where tasks may end up with multiple threads all running and attempting to make progress when there should only be a single thread running for that task at a time. Fixes KAFKA-5896 commit ed3ef9c1f139cae4f09eefe1f66edc1c58c5ace6 Author: Nick Pillitteri Date: 2017-09-15T19:54:15Z Rename per CR commit afea834c708eae2b7d3dedbdeafed83197eaed94 Author: Nick Pillitteri Date: 2017-09-16T04:52:57Z Rename per CR > Kafka Connect task threads never interrupted > > > Key: KAFKA-5896 > URL: https://issues.apache.org/jira/browse/KAFKA-5896 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Nick Pillitteri >Priority: Minor > > h2. Problem > Kafka Connect tasks associated with connectors are run in their own threads. > When tasks are stopped or restarted, a flag is set - {{stopping}} - to > indicate the task should stop processing records. However, if the thread the > task is running in is blocked (waiting for a lock or performing I/O) it's > possible the task will never stop. > I've created a connector specifically to demonstrate this issue (along with > some more detailed instructions for reproducing the issue): > https://github.com/smarter-travel-media/hang-connector > I believe this is an issue because it means that a single badly behaved > connector (any connector that does I/O without timeouts) can cause the Kafka > Connect worker to get into a state where the only solution is to restart the > JVM. > I think, but couldn't reproduce, that this is the cause of this problem on > Stack Overflow: > https://stackoverflow.com/questions/43802156/inconsistent-connector-state-connectexception-task-already-exists-in-this-work > h2. Expected Result > I would expect the Worker to eventually interrupt the thread that the task is > running in. In the past across various other libraries, this is what I've > seen done when a thread needs to be forcibly stopped. > h2. Actual Result > In actuality, the Worker sets a {{stopping}} flag and lets the thread run > indefinitely. It uses a timeout while waiting for the task to stop but after > this timeout has expired it simply sets a {{cancelled}} flag. This means that > every time a task is restarted, a new thread running the task will be > created. Thus a task may end up with multiple instances all running in their > own threads when there's only supposed to be a single thread. > h2. Steps to Reproduce > The problem can be replicated by using the connector available here: > https://github.com/smarter-travel-media/hang-connector > Apologies for how involved the steps are. > I've created a patch that forcibly interrupts threads after they fail to > gracefully shutdown here: > https://github.com/smarter-travel-media/kafka/commit/295c747a9fd82ee8b30556c89c31e0bfcce5a2c5 > I've confirmed that this fixes the issue. I can add some unit tests and > submit a PR if people agre
[jira] [Commented] (KAFKA-5896) Kafka Connect task threads never interrupted
[ https://issues.apache.org/jira/browse/KAFKA-5896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16168318#comment-16168318 ] ASF GitHub Bot commented on KAFKA-5896: --- GitHub user 56quarters opened a pull request: https://github.com/apache/kafka/pull/3876 Force Connect tasks to stop via thread interruption after a timeout Interrupt the thread of Kafka Connect tasks that do not stop within the timeout via `Worker::stopAndAwaitTasks()`. Previously tasks would be asked to stop via setting a `stopping` flag. It was possible for tasks to ignore this flag if they were, for example, waiting for a lock or blocked on I/O. This prevents issues where tasks may end up with multiple threads all running and attempting to make progress when there should only be a single thread running for that task at a time. Fixes KAFKA-5896 /cc @rhauch @tedyu You can merge this pull request into a Git repository by running: $ git pull https://github.com/smarter-travel-media/kafka force-task-stop Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3876.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3876 commit 31c879c1a1f0bd4f5999c021baca8e99e733ffe1 Author: Nick Pillitteri Date: 2017-09-13T14:54:40Z Force Connect tasks to stop via thread interruption after a timeout Interrupt the thread of Kafka Connect tasks that do not stop within the timeout via Worker::stopAndAwaitTasks(). Previously tasks would be asked to stop via setting a `stopping` flag. It was possible for tasks to ignore this flag if they were, for example, waiting for a lock or blocked on I/O. This prevents issues where tasks may end up with multiple threads all running and attempting to make progress when there should only be a single thread running for that task at a time. Fixes KAFKA-5896 > Kafka Connect task threads never interrupted > > > Key: KAFKA-5896 > URL: https://issues.apache.org/jira/browse/KAFKA-5896 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Nick Pillitteri >Priority: Minor > > h2. Problem > Kafka Connect tasks associated with connectors are run in their own threads. > When tasks are stopped or restarted, a flag is set - {{stopping}} - to > indicate the task should stop processing records. However, if the thread the > task is running in is blocked (waiting for a lock or performing I/O) it's > possible the task will never stop. > I've created a connector specifically to demonstrate this issue (along with > some more detailed instructions for reproducing the issue): > https://github.com/smarter-travel-media/hang-connector > I believe this is an issue because it means that a single badly behaved > connector (any connector that does I/O without timeouts) can cause the Kafka > Connect worker to get into a state where the only solution is to restart the > JVM. > I think, but couldn't reproduce, that this is the cause of this problem on > Stack Overflow: > https://stackoverflow.com/questions/43802156/inconsistent-connector-state-connectexception-task-already-exists-in-this-work > h2. Expected Result > I would expect the Worker to eventually interrupt the thread that the task is > running in. In the past across various other libraries, this is what I've > seen done when a thread needs to be forcibly stopped. > h2. Actual Result > In actuality, the Worker sets a {{stopping}} flag and lets the thread run > indefinitely. It uses a timeout while waiting for the task to stop but after > this timeout has expired it simply sets a {{cancelled}} flag. This means that > every time a task is restarted, a new thread running the task will be > created. Thus a task may end up with multiple instances all running in their > own threads when there's only supposed to be a single thread. > h2. Steps to Reproduce > The problem can be replicated by using the connector available here: > https://github.com/smarter-travel-media/hang-connector > Apologies for how involved the steps are. > I've created a patch that forcibly interrupts threads after they fail to > gracefully shutdown here: > https://github.com/smarter-travel-media/kafka/commit/295c747a9fd82ee8b30556c89c31e0bfcce5a2c5 > I've confirmed that this fixes the issue. I can add some unit tests and > submit a PR if people agree that this is a bug and interrupting threads is > the right fix. > Thanks! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5896) Kafka Connect task threads never interrupted
[ https://issues.apache.org/jira/browse/KAFKA-5896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16167174#comment-16167174 ] Nick Pillitteri commented on KAFKA-5896: [~rhauch] sure! > Kafka Connect task threads never interrupted > > > Key: KAFKA-5896 > URL: https://issues.apache.org/jira/browse/KAFKA-5896 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Nick Pillitteri >Priority: Minor > > h2. Problem > Kafka Connect tasks associated with connectors are run in their own threads. > When tasks are stopped or restarted, a flag is set - {{stopping}} - to > indicate the task should stop processing records. However, if the thread the > task is running in is blocked (waiting for a lock or performing I/O) it's > possible the task will never stop. > I've created a connector specifically to demonstrate this issue (along with > some more detailed instructions for reproducing the issue): > https://github.com/smarter-travel-media/hang-connector > I believe this is an issue because it means that a single badly behaved > connector (any connector that does I/O without timeouts) can cause the Kafka > Connect worker to get into a state where the only solution is to restart the > JVM. > I think, but couldn't reproduce, that this is the cause of this problem on > Stack Overflow: > https://stackoverflow.com/questions/43802156/inconsistent-connector-state-connectexception-task-already-exists-in-this-work > h2. Expected Result > I would expect the Worker to eventually interrupt the thread that the task is > running in. In the past across various other libraries, this is what I've > seen done when a thread needs to be forcibly stopped. > h2. Actual Result > In actuality, the Worker sets a {{stopping}} flag and lets the thread run > indefinitely. It uses a timeout while waiting for the task to stop but after > this timeout has expired it simply sets a {{cancelled}} flag. This means that > every time a task is restarted, a new thread running the task will be > created. Thus a task may end up with multiple instances all running in their > own threads when there's only supposed to be a single thread. > h2. Steps to Reproduce > The problem can be replicated by using the connector available here: > https://github.com/smarter-travel-media/hang-connector > Apologies for how involved the steps are. > I've created a patch that forcibly interrupts threads after they fail to > gracefully shutdown here: > https://github.com/smarter-travel-media/kafka/commit/295c747a9fd82ee8b30556c89c31e0bfcce5a2c5 > I've confirmed that this fixes the issue. I can add some unit tests and > submit a PR if people agree that this is a bug and interrupting threads is > the right fix. > Thanks! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5896) Kafka Connect task threads never interrupted
[ https://issues.apache.org/jira/browse/KAFKA-5896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16167008#comment-16167008 ] Randall Hauch commented on KAFKA-5896: -- [~56quarters], can you please @rhauch on the PR? Thanks! > Kafka Connect task threads never interrupted > > > Key: KAFKA-5896 > URL: https://issues.apache.org/jira/browse/KAFKA-5896 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Nick Pillitteri >Priority: Minor > > h2. Problem > Kafka Connect tasks associated with connectors are run in their own threads. > When tasks are stopped or restarted, a flag is set - {{stopping}} - to > indicate the task should stop processing records. However, if the thread the > task is running in is blocked (waiting for a lock or performing I/O) it's > possible the task will never stop. > I've created a connector specifically to demonstrate this issue (along with > some more detailed instructions for reproducing the issue): > https://github.com/smarter-travel-media/hang-connector > I believe this is an issue because it means that a single badly behaved > connector (any connector that does I/O without timeouts) can cause the Kafka > Connect worker to get into a state where the only solution is to restart the > JVM. > I think, but couldn't reproduce, that this is the cause of this problem on > Stack Overflow: > https://stackoverflow.com/questions/43802156/inconsistent-connector-state-connectexception-task-already-exists-in-this-work > h2. Expected Result > I would expect the Worker to eventually interrupt the thread that the task is > running in. In the past across various other libraries, this is what I've > seen done when a thread needs to be forcibly stopped. > h2. Actual Result > In actuality, the Worker sets a {{stopping}} flag and lets the thread run > indefinitely. It uses a timeout while waiting for the task to stop but after > this timeout has expired it simply sets a {{cancelled}} flag. This means that > every time a task is restarted, a new thread running the task will be > created. Thus a task may end up with multiple instances all running in their > own threads when there's only supposed to be a single thread. > h2. Steps to Reproduce > The problem can be replicated by using the connector available here: > https://github.com/smarter-travel-media/hang-connector > Apologies for how involved the steps are. > I've created a patch that forcibly interrupts threads after they fail to > gracefully shutdown here: > https://github.com/smarter-travel-media/kafka/commit/295c747a9fd82ee8b30556c89c31e0bfcce5a2c5 > I've confirmed that this fixes the issue. I can add some unit tests and > submit a PR if people agree that this is a bug and interrupting threads is > the right fix. > Thanks! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5896) Kafka Connect task threads never interrupted
[ https://issues.apache.org/jira/browse/KAFKA-5896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16166977#comment-16166977 ] Nick Pillitteri commented on KAFKA-5896: Sure, will do! > Kafka Connect task threads never interrupted > > > Key: KAFKA-5896 > URL: https://issues.apache.org/jira/browse/KAFKA-5896 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Nick Pillitteri >Priority: Minor > > h2. Problem > Kafka Connect tasks associated with connectors are run in their own threads. > When tasks are stopped or restarted, a flag is set - {{stopping}} - to > indicate the task should stop processing records. However, if the thread the > task is running in is blocked (waiting for a lock or performing I/O) it's > possible the task will never stop. > I've created a connector specifically to demonstrate this issue (along with > some more detailed instructions for reproducing the issue): > https://github.com/smarter-travel-media/hang-connector > I believe this is an issue because it means that a single badly behaved > connector (any connector that does I/O without timeouts) can cause the Kafka > Connect worker to get into a state where the only solution is to restart the > JVM. > I think, but couldn't reproduce, that this is the cause of this problem on > Stack Overflow: > https://stackoverflow.com/questions/43802156/inconsistent-connector-state-connectexception-task-already-exists-in-this-work > h2. Expected Result > I would expect the Worker to eventually interrupt the thread that the task is > running in. In the past across various other libraries, this is what I've > seen done when a thread needs to be forcibly stopped. > h2. Actual Result > In actuality, the Worker sets a {{stopping}} flag and lets the thread run > indefinitely. It uses a timeout while waiting for the task to stop but after > this timeout has expired it simply sets a {{cancelled}} flag. This means that > every time a task is restarted, a new thread running the task will be > created. Thus a task may end up with multiple instances all running in their > own threads when there's only supposed to be a single thread. > h2. Steps to Reproduce > The problem can be replicated by using the connector available here: > https://github.com/smarter-travel-media/hang-connector > Apologies for how involved the steps are. > I've created a patch that forcibly interrupts threads after they fail to > gracefully shutdown here: > https://github.com/smarter-travel-media/kafka/commit/295c747a9fd82ee8b30556c89c31e0bfcce5a2c5 > I've confirmed that this fixes the issue. I can add some unit tests and > submit a PR if people agree that this is a bug and interrupting threads is > the right fix. > Thanks! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5896) Kafka Connect task threads never interrupted
[ https://issues.apache.org/jira/browse/KAFKA-5896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16166907#comment-16166907 ] Ted Yu commented on KAFKA-5896: --- Good find. Mind sending a PR (hopefully with test showing the effectiveness). > Kafka Connect task threads never interrupted > > > Key: KAFKA-5896 > URL: https://issues.apache.org/jira/browse/KAFKA-5896 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Nick Pillitteri >Priority: Minor > > h2. Problem > Kafka Connect tasks associated with connectors are run in their own threads. > When tasks are stopped or restarted, a flag is set - {{stopping}} - to > indicate the task should stop processing records. However, if the thread the > task is running in is blocked (waiting for a lock or performing I/O) it's > possible the task will never stop. > I've created a connector specifically to demonstrate this issue (along with > some more detailed instructions for reproducing the issue): > https://github.com/smarter-travel-media/hang-connector > I believe this is an issue because it means that a single badly behaved > connector (any connector that does I/O without timeouts) can cause the Kafka > Connect worker to get into a state where the only solution is to restart the > JVM. > I think, but couldn't reproduce, that this is the cause of this problem on > Stack Overflow: > https://stackoverflow.com/questions/43802156/inconsistent-connector-state-connectexception-task-already-exists-in-this-work > h2. Expected Result > I would expect the Worker to eventually interrupt the thread that the task is > running in. In the past across various other libraries, this is what I've > seen done when a thread needs to be forcibly stopped. > h2. Actual Result > In actuality, the Worker sets a {{stopping}} flag and lets the thread run > indefinitely. It uses a timeout while waiting for the task to stop but after > this timeout has expired it simply sets a {{cancelled}} flag. This means that > every time a task is restarted, a new thread running the task will be > created. Thus a task may end up with multiple instances all running in their > own threads when there's only supposed to be a single thread. > h2. Steps to Reproduce > The problem can be replicated by using the connector available here: > https://github.com/smarter-travel-media/hang-connector > Apologies for how involved the steps are. > I've created a patch that forcibly interrupts threads after they fail to > gracefully shutdown here: > https://github.com/smarter-travel-media/kafka/commit/295c747a9fd82ee8b30556c89c31e0bfcce5a2c5 > I've confirmed that this fixes the issue. I can add some unit tests and > submit a PR if people agree that this is a bug and interrupting threads is > the right fix. > Thanks! -- This message was sent by Atlassian JIRA (v6.4.14#64029)