[ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16472374#comment-16472374
 ] 

Robert Yokota edited comment on KAFKA-6566 at 5/11/18 6:04 PM:
---------------------------------------------------------------

I've just started looking at this, but it looks like the correct place to put 
the the {{task.stop()}} is in {{WorkerSourceTask.close()}}.  This would mirror 
the call to {{task.stop()}} in {{WorkerSinkTask.close()}}.   {{close()}} is 
called in a finally block in {{WorkerTask.doRun()}} here:

https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L176

There is another possible change I am looking at and that is to put a call to 
{{task.stop()}} in {{WorkerSinkTask.stop()}}.  This would mirror the call to 
{{task.stop()}} in {{WorkerSourceTask.stop()}}.

Ideally the source and sink would be symmetrical in order to make it easier to 
reason about esp. for Connect developers.  The above changes assume that 
{{task.stop()}} is idempotent for both the source and sink.






was (Author: rayokota):
I've just started looking at this, but it looks like the correct place to put 
the the `task.stop()` is in `WorkerSourceTask.close()`.  This would mirror the 
call to `task.stop()` in `WorkerSinkTask.close()`.   `close()` is called in a 
finally block in `WorkerTask.doRun()` here:

https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L176

There is another possible change I am looking at and that is to put a call to 
`task.stop()` in `WorkerSinkTask.stop()`.  This would mirror the call to 
`task.stop()` in `WorkerSourceTask.stop()`.

Ideally the source and sink would be symmetrical in order to make it easier to 
reason about esp. for Connect developers.  The above changes assume that 
`task.stop()` is idempotent for both the source and sink.





> SourceTask#stop() not called after exception raised in poll()
> -------------------------------------------------------------
>
>                 Key: KAFKA-6566
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6566
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 1.0.0
>            Reporter: Gunnar Morling
>            Priority: Major
>
> Having discussed this with [~rhauch], it has been my assumption that 
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case 
> an exception has been raised in {{poll()}}. That's not the case, though. As 
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
> action to take, as it'll allow the task to clean up any resources such as 
> releasing any database connections, right after that failure and not only 
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
>     @Override
>     public String version() {
>         return null;
>     }
>     @Override
>     public void start(Map<String, String> props) {
>     }
>     @Override
>     public Class<? extends Task> taskClass() {
>         return TestTask.class;
>     }
>     @Override
>     public List<Map<String, String>> taskConfigs(int maxTasks) {
>         return Collections.singletonList(Collections.singletonMap("foo", 
> "bar"));
>     }
>     @Override
>     public void stop() {
>     }
>     @Override
>     public ConfigDef config() {
>         return new ConfigDef();
>     }
>     public static class TestTask extends SourceTask {
>         @Override
>         public String version() {
>             return null;
>         }
>         @Override
>         public void start(Map<String, String> props) {
>         }
>         @Override
>         public List<SourceRecord> poll() throws InterruptedException {
>             throw new RuntimeException();
>         }
>         @Override
>         public void stop() {
>             System.out.println("stop() called");
>         }
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to