[ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16366411#comment-16366411
 ] 

Gunnar Morling commented on KAFKA-6566:
---------------------------------------

+1 for more exploration whether calling stop() was the original intention. 
FWIW, several comments in Debezium indicate that this at least was the 
asumption on that side when the code was written (of course that assumption may 
have been right or wrong).

Note that calling stop() would be the only chance for the task to clean up its 
resources after an exception on the KC side of the polling loop (while we could 
have our entire polling logic in a try/catch block, there's no chance to react 
to exceptions in WorkerSourceTask).

> SourceTask#stop() not called after exception raised in poll()
> -------------------------------------------------------------
>
>                 Key: KAFKA-6566
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6566
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 1.0.0
>            Reporter: Gunnar Morling
>            Priority: Major
>
> Having discussed this with [~rhauch], it has been my assumption that 
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case 
> an exception has been raised in {{poll()}}. That's not the case, though. As 
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
> action to take, as it'll allow the task to clean up any resources such as 
> releasing any database connections, right after that failure and not only 
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
>     @Override
>     public String version() {
>         return null;
>     }
>     @Override
>     public void start(Map<String, String> props) {
>     }
>     @Override
>     public Class<? extends Task> taskClass() {
>         return TestTask.class;
>     }
>     @Override
>     public List<Map<String, String>> taskConfigs(int maxTasks) {
>         return Collections.singletonList(Collections.singletonMap("foo", 
> "bar"));
>     }
>     @Override
>     public void stop() {
>     }
>     @Override
>     public ConfigDef config() {
>         return new ConfigDef();
>     }
>     public static class TestTask extends SourceTask {
>         @Override
>         public String version() {
>             return null;
>         }
>         @Override
>         public void start(Map<String, String> props) {
>         }
>         @Override
>         public List<SourceRecord> poll() throws InterruptedException {
>             throw new RuntimeException();
>         }
>         @Override
>         public void stop() {
>             System.out.println("stop() called");
>         }
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to