[ https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16472374#comment-16472374 ]
Robert Yokota edited comment on KAFKA-6566 at 5/11/18 6:04 PM: --------------------------------------------------------------- I've just started looking at this, but it looks like the correct place to put the the {{task.stop()}} is in {{WorkerSourceTask.close()}}. This would mirror the call to {{task.stop()}} in {{WorkerSinkTask.close()}}. {{close()}} is called in a finally block in {{WorkerTask.doRun()}} here: https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L176 There is another possible change I am looking at and that is to put a call to {{task.stop()}} in {{WorkerSinkTask.stop()}}. This would mirror the call to {{task.stop()}} in {{WorkerSourceTask.stop()}}. Ideally the source and sink would be symmetrical in order to make it easier to reason about esp. for Connect developers. The above changes assume that {{task.stop()}} is idempotent for both the source and sink. was (Author: rayokota): I've just started looking at this, but it looks like the correct place to put the the `task.stop()` is in `WorkerSourceTask.close()`. This would mirror the call to `task.stop()` in `WorkerSinkTask.close()`. `close()` is called in a finally block in `WorkerTask.doRun()` here: https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L176 There is another possible change I am looking at and that is to put a call to `task.stop()` in `WorkerSinkTask.stop()`. This would mirror the call to `task.stop()` in `WorkerSourceTask.stop()`. Ideally the source and sink would be symmetrical in order to make it easier to reason about esp. for Connect developers. The above changes assume that `task.stop()` is idempotent for both the source and sink. > SourceTask#stop() not called after exception raised in poll() > ------------------------------------------------------------- > > Key: KAFKA-6566 > URL: https://issues.apache.org/jira/browse/KAFKA-6566 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 1.0.0 > Reporter: Gunnar Morling > Priority: Major > > Having discussed this with [~rhauch], it has been my assumption that > {{SourceTask#stop()}} will be called by the Kafka Connect framework in case > an exception has been raised in {{poll()}}. That's not the case, though. As > an example see the connector and task below. > Calling {{stop()}} after an exception in {{poll()}} seems like a very useful > action to take, as it'll allow the task to clean up any resources such as > releasing any database connections, right after that failure and not only > once the connector is stopped. > {code} > package com.example; > import java.util.Collections; > import java.util.List; > import java.util.Map; > import org.apache.kafka.common.config.ConfigDef; > import org.apache.kafka.connect.connector.Task; > import org.apache.kafka.connect.source.SourceConnector; > import org.apache.kafka.connect.source.SourceRecord; > import org.apache.kafka.connect.source.SourceTask; > public class TestConnector extends SourceConnector { > @Override > public String version() { > return null; > } > @Override > public void start(Map<String, String> props) { > } > @Override > public Class<? extends Task> taskClass() { > return TestTask.class; > } > @Override > public List<Map<String, String>> taskConfigs(int maxTasks) { > return Collections.singletonList(Collections.singletonMap("foo", > "bar")); > } > @Override > public void stop() { > } > @Override > public ConfigDef config() { > return new ConfigDef(); > } > public static class TestTask extends SourceTask { > @Override > public String version() { > return null; > } > @Override > public void start(Map<String, String> props) { > } > @Override > public List<SourceRecord> poll() throws InterruptedException { > throw new RuntimeException(); > } > @Override > public void stop() { > System.out.println("stop() called"); > } > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)