[ https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ewen Cheslack-Postava updated KAFKA-6566: ----------------------------------------- Fix Version/s: 1.1.1 1.0.2 0.11.0.3 > SourceTask#stop() not called after exception raised in poll() > ------------------------------------------------------------- > > Key: KAFKA-6566 > URL: https://issues.apache.org/jira/browse/KAFKA-6566 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 1.0.0 > Reporter: Gunnar Morling > Assignee: Robert Yokota > Priority: Blocker > Fix For: 2.0.0, 0.11.0.3, 1.0.2, 1.1.1 > > > Having discussed this with [~rhauch], it has been my assumption that > {{SourceTask#stop()}} will be called by the Kafka Connect framework in case > an exception has been raised in {{poll()}}. That's not the case, though. As > an example see the connector and task below. > Calling {{stop()}} after an exception in {{poll()}} seems like a very useful > action to take, as it'll allow the task to clean up any resources such as > releasing any database connections, right after that failure and not only > once the connector is stopped. > {code} > package com.example; > import java.util.Collections; > import java.util.List; > import java.util.Map; > import org.apache.kafka.common.config.ConfigDef; > import org.apache.kafka.connect.connector.Task; > import org.apache.kafka.connect.source.SourceConnector; > import org.apache.kafka.connect.source.SourceRecord; > import org.apache.kafka.connect.source.SourceTask; > public class TestConnector extends SourceConnector { > @Override > public String version() { > return null; > } > @Override > public void start(Map<String, String> props) { > } > @Override > public Class<? extends Task> taskClass() { > return TestTask.class; > } > @Override > public List<Map<String, String>> taskConfigs(int maxTasks) { > return Collections.singletonList(Collections.singletonMap("foo", > "bar")); > } > @Override > public void stop() { > } > @Override > public ConfigDef config() { > return new ConfigDef(); > } > public static class TestTask extends SourceTask { > @Override > public String version() { > return null; > } > @Override > public void start(Map<String, String> props) { > } > @Override > public List<SourceRecord> poll() throws InterruptedException { > throw new RuntimeException(); > } > @Override > public void stop() { > System.out.println("stop() called"); > } > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)